xintongsong commented on code in PR #22807:
URL: https://github.com/apache/flink/pull/22807#discussion_r1247388802


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.Region;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The implementation of {@link PartitionFileReader} with producer-merge mode. 
In this mode, the
+ * shuffle data is written in the producer side, the consumer side need to 
read multiple producers
+ * to get its partition data.
+ *
+ * <p>Note that one partition file may contain the data of multiple 
subpartitions.
+ */
+public class ProducerMergedPartitionFileReader implements PartitionFileReader {
+
+    /**
+     * Max number of caches.
+     *
+     * <p>The constant defines the maximum number of caches that can be 
created. Its value is set to
+     * 10000, which is considered sufficient for most parallel jobs. Each 
cache only contains
+     * references and numerical variables and occupies a minimal amount of 
memory so the value is
+     * not excessively large.
+     */
+    private static final int MAX_CACHE_NUM = 10000;
+
+    /**
+     * Buffer offset caches stored in map.
+     *
+     * <p>The key is the combination of {@link TieredStorageSubpartitionId} 
and buffer index. The
+     * value is the buffer offset cache, which includes file offset of the 
buffer index, the region
+     * containing the buffer index and next buffer index to consume.
+     */
+    private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, 
BufferOffsetCache>
+            bufferOffsetCaches;
+
+    private final ByteBuffer reusedHeaderBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+
+    private final Path dataFilePath;
+
+    private final ProducerMergedPartitionFileIndex dataIndex;
+
+    private FileChannel fileChannel;
+
+    /** The current number of caches. */
+    private int numCaches;
+
+    ProducerMergedPartitionFileReader(
+            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
+        this.dataFilePath = dataFilePath;
+        this.dataIndex = dataIndex;
+        this.bufferOffsetCaches = new HashMap<>();
+    }
+
+    @Override
+    public Buffer readBuffer(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId,
+            int bufferIndex,
+            MemorySegment memorySegment,
+            BufferRecycler recycler)
+            throws IOException {
+
+        lazyInitializeFileChannel();
+        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
+                Tuple2.of(subpartitionId, bufferIndex);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey);
+        if (!cache.isPresent()) {
+            return null;
+        }
+        fileChannel.position(cache.get().getFileOffset());
+        Buffer buffer =
+                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
+        boolean hasBuffer =
+                cache.get()
+                        .advance(
+                                checkNotNull(buffer).readableBytes()
+                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
+        if (hasBuffer) {
+            int nextBufferIndex = bufferIndex + 1;
+            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
+            // cache number. Testing to prevent cache leaks has been 
implemented.
+            if (numCaches < MAX_CACHE_NUM) {
+                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
+                numCaches++;
+            }
+        }
+        return buffer;
+    }
+
+    @Override
+    public long getPriority(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId,
+            int bufferIndex) {
+        lazyInitializeFileChannel();
+        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
+                Tuple2.of(subpartitionId, bufferIndex);
+        return 
tryGetCache(cacheKey).map(BufferOffsetCache::getFileOffset).orElse(Long.MAX_VALUE);
+    }
+
+    @Override
+    public void release() {
+        if (fileChannel != null) {
+            try {
+                fileChannel.close();
+            } catch (IOException e) {
+                ExceptionUtils.rethrow(e, "Failed to close file channel.");
+            }
+        }
+        IOUtils.deleteFileQuietly(dataFilePath);
+    }
+
+    /**
+     * Initialize the file channel in a lazy manner, which can reduce usage of 
the file descriptor
+     * resource.
+     */
+    private void lazyInitializeFileChannel() {
+        if (fileChannel == null) {
+            try {
+                fileChannel = FileChannel.open(dataFilePath, 
StandardOpenOption.READ);
+            } catch (IOException e) {
+                ExceptionUtils.rethrow(e, "Failed to open file channel.");
+            }
+        }
+    }
+
+    /**
+     * Try to get the cache according to the key.
+     *
+     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
+     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
+     * data index and returned.
+     *
+     * @param cacheKey the key of cache.
+     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
+     *     Optional#empty()}.
+     */
+    private Optional<BufferOffsetCache> tryGetCache(
+            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey) {
+        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
+        if (bufferOffsetCache == null) {
+            Optional<Region> regionOpt = dataIndex.getRegion(cacheKey.f0, 
cacheKey.f1);
+            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
+        } else {
+            numCaches--;
+            return Optional.empty();

Review Comment:
   Shouldn't we return `bufferOffsetCache` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to