xintongsong commented on code in PR #22807: URL: https://github.com/apache/flink/pull/22807#discussion_r1247388802
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer; +import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.Region; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The implementation of {@link PartitionFileReader} with producer-merge mode. In this mode, the + * shuffle data is written in the producer side, the consumer side need to read multiple producers + * to get its partition data. + * + * <p>Note that one partition file may contain the data of multiple subpartitions. + */ +public class ProducerMergedPartitionFileReader implements PartitionFileReader { + + /** + * Max number of caches. + * + * <p>The constant defines the maximum number of caches that can be created. Its value is set to + * 10000, which is considered sufficient for most parallel jobs. Each cache only contains + * references and numerical variables and occupies a minimal amount of memory so the value is + * not excessively large. + */ + private static final int MAX_CACHE_NUM = 10000; + + /** + * Buffer offset caches stored in map. + * + * <p>The key is the combination of {@link TieredStorageSubpartitionId} and buffer index. The + * value is the buffer offset cache, which includes file offset of the buffer index, the region + * containing the buffer index and next buffer index to consume. + */ + private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, BufferOffsetCache> + bufferOffsetCaches; + + private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); + + private final Path dataFilePath; + + private final ProducerMergedPartitionFileIndex dataIndex; + + private FileChannel fileChannel; + + /** The current number of caches. */ + private int numCaches; + + ProducerMergedPartitionFileReader( + Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { + this.dataFilePath = dataFilePath; + this.dataIndex = dataIndex; + this.bufferOffsetCaches = new HashMap<>(); + } + + @Override + public Buffer readBuffer( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId, + int bufferIndex, + MemorySegment memorySegment, + BufferRecycler recycler) + throws IOException { + + lazyInitializeFileChannel(); + Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = + Tuple2.of(subpartitionId, bufferIndex); + Optional<BufferOffsetCache> cache = tryGetCache(cacheKey); + if (!cache.isPresent()) { + return null; + } + fileChannel.position(cache.get().getFileOffset()); + Buffer buffer = + readFromByteChannel(fileChannel, reusedHeaderBuffer, memorySegment, recycler); + boolean hasBuffer = + cache.get() + .advance( + checkNotNull(buffer).readableBytes() + + BufferReaderWriterUtil.HEADER_LENGTH); + if (hasBuffer) { + int nextBufferIndex = bufferIndex + 1; + // TODO: introduce the LRU cache strategy in the future to restrict the total + // cache number. Testing to prevent cache leaks has been implemented. + if (numCaches < MAX_CACHE_NUM) { + bufferOffsetCaches.put(Tuple2.of(subpartitionId, nextBufferIndex), cache.get()); + numCaches++; + } + } + return buffer; + } + + @Override + public long getPriority( + TieredStoragePartitionId partitionId, + TieredStorageSubpartitionId subpartitionId, + int segmentId, + int bufferIndex) { + lazyInitializeFileChannel(); + Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = + Tuple2.of(subpartitionId, bufferIndex); + return tryGetCache(cacheKey).map(BufferOffsetCache::getFileOffset).orElse(Long.MAX_VALUE); + } + + @Override + public void release() { + if (fileChannel != null) { + try { + fileChannel.close(); + } catch (IOException e) { + ExceptionUtils.rethrow(e, "Failed to close file channel."); + } + } + IOUtils.deleteFileQuietly(dataFilePath); + } + + /** + * Initialize the file channel in a lazy manner, which can reduce usage of the file descriptor + * resource. + */ + private void lazyInitializeFileChannel() { + if (fileChannel == null) { + try { + fileChannel = FileChannel.open(dataFilePath, StandardOpenOption.READ); + } catch (IOException e) { + ExceptionUtils.rethrow(e, "Failed to open file channel."); + } + } + } + + /** + * Try to get the cache according to the key. + * + * <p>If the relevant buffer offset cache exists, it will be returned and subsequently removed. + * However, if the buffer offset cache does not exist, a new cache will be created using the + * data index and returned. + * + * @param cacheKey the key of cache. + * @return returns the relevant buffer offset cache if it exists, otherwise return {@link + * Optional#empty()}. + */ + private Optional<BufferOffsetCache> tryGetCache( + Tuple2<TieredStorageSubpartitionId, Integer> cacheKey) { + BufferOffsetCache bufferOffsetCache = bufferOffsetCaches.remove(cacheKey); + if (bufferOffsetCache == null) { + Optional<Region> regionOpt = dataIndex.getRegion(cacheKey.f0, cacheKey.f1); + return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, region)); + } else { + numCaches--; + return Optional.empty(); Review Comment: Shouldn't we return `bufferOffsetCache` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
