xintongsong commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1309895267


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -40,16 +49,20 @@ public interface PartitionFileReader {
      * @param bufferIndex the index of buffer
      * @param memorySegment the empty buffer to store the read buffer
      * @param recycler the buffer recycler
+     * @param partialBuffer the previous partial buffer. The partial buffer is 
not null only when
+     *     the last read has a partial buffer, it will construct a full buffer 
during the read
+     *     process.
      * @return null if there is no data otherwise a buffer.
      */
     @Nullable
-    Buffer readBuffer(
+    List<Buffer> readBuffer(

Review Comment:
   We should also update JavaDoc w.r.t. the return-value change. And it should 
no longer be `@Nullable`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer implements Buffer {
+
+        private final long fileOffset;
+
+        private final CompositeBuffer compositeBuffer;

Review Comment:
   I wonder if we can simply make `PartialBuffer` extends `CompositeBuffer`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public List<Buffer> readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable PartialBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
+
+        List<Buffer> readBuffers = new LinkedList<>();
         Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
                 Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, 
partialBuffer, true);
         if (!cache.isPresent()) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
+
+        // Get the read offset, including the start offset, the end offset
+        long readStartOffset =
+                partialBuffer == null ? cache.get().getFileOffset() : 
partialBuffer.getFileOffset();
+        long readEndOffset = cache.get().region.getRegionFileEndOffset();
+        checkState(readStartOffset <= readEndOffset);
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+        if (numBytesToRead == 0) {
+            return null;
+        }
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);

Review Comment:
   Why do we need to create the NetworkBuffer here and pass both NetworkBuffer 
and ByteBuffer into sliceBuffer?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public List<Buffer> readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable PartialBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
+
+        List<Buffer> readBuffers = new LinkedList<>();
         Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
                 Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, 
partialBuffer, true);
         if (!cache.isPresent()) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
+
+        // Get the read offset, including the start offset, the end offset
+        long readStartOffset =
+                partialBuffer == null ? cache.get().getFileOffset() : 
partialBuffer.getFileOffset();
+        long readEndOffset = cache.get().region.getRegionFileEndOffset();
+        checkState(readStartOffset <= readEndOffset);
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+        if (numBytesToRead == 0) {
+            return null;
+        }
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        int numFullBuffers;
+        int numBytesRealRead;
+        int numBytesPartial;
+        boolean noMoreDataInRegion = false;
+        try {
+            // Slice the read memory segment to multiple small network buffers 
and add them to
+            // readBuffers
+            Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+                    sliceBuffer(byteBuffer, buffer, partialBuffer, 
readBuffers);
+            numFullBuffers = readBuffers.size();
+            numBytesRealRead = partial.f2;
+            checkState(
+                    numBytesRealRead <= numBytesToRead
+                            && numBytesToRead - numBytesRealRead < 
HEADER_LENGTH);
+            if (readStartOffset + numBytesRealRead < readEndOffset) {
+                // If the region is not finished read, generate a partial 
buffer to store the
+                // partial data, then append the partial buffer to the tail of 
readBuffers
+                partialBuffer =
+                        new PartialBuffer(
+                                readStartOffset + numBytesRealRead, 
partial.f0, partial.f1);
+                numBytesPartial =
+                        partialBuffer.readableBytes()
+                                + (partialBuffer.getBufferHeader() == null ? 0 
: HEADER_LENGTH);
+                checkState(
+                        partialBuffer.getBufferHeader() != null
+                                || partialBuffer.getCompositeBuffer() == null);
+                readBuffers.add(partialBuffer);
+            } else {
+                // The region is read completely
+                checkState(partial.f0 == null);
+                noMoreDataInRegion = true;

Review Comment:
   Why do we need this? It's never used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -399,8 +405,48 @@ private void prepareForScheduling() {
             priority =
                     nextSegmentId < 0
                             ? Long.MAX_VALUE
-                            : partitionFileReader.getPriority(
-                                    partitionId, subpartitionId, 
nextSegmentId, nextBufferIndex);
+                            : partialBuffer != null
+                                    ? partialBuffer.getFileOffset()
+                                    : partitionFileReader.getPriority(
+                                            partitionId,
+                                            subpartitionId,
+                                            nextSegmentId,
+                                            nextBufferIndex);
+        }
+
+        private List<Buffer> readBuffersFromFileReader(

Review Comment:
   Why wrapping this as a method?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer implements Buffer {
+
+        private final long fileOffset;
+
+        private final CompositeBuffer compositeBuffer;
+
+        private final BufferHeader bufferHeader;
+
+        public PartialBuffer(
+                long fileOffset, CompositeBuffer compositeBuffer, BufferHeader 
bufferHeader) {
+            checkArgument(fileOffset >= 0);
+            this.fileOffset = fileOffset;
+            this.compositeBuffer = compositeBuffer;
+            this.bufferHeader = bufferHeader;
+        }
+
+        /**
+         * Returns the underlying file offset. Note that the file offset 
includes the length of the
+         * partial buffer.
+         */
+        public long getFileOffset() {
+            return fileOffset;
+        }
+
+        public CompositeBuffer getCompositeBuffer() {
+            return compositeBuffer;
+        }
+
+        public BufferHeader getBufferHeader() {
+            return bufferHeader;
+        }
+
+        @Override
+        public boolean isBuffer() {
+            return compositeBuffer.isBuffer();
+        }
+
+        @Override
+        public MemorySegment getMemorySegment() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getMemorySegmentOffset() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public BufferRecycler getRecycler() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void setRecycler(BufferRecycler bufferRecycler) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void recycleBuffer() {
+            if (compositeBuffer != null) {
+                compositeBuffer.recycleBuffer();

Review Comment:
   How could `compositeBuffer` be null? Do we allow that at all?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer implements Buffer {
+
+        private final long fileOffset;
+
+        private final CompositeBuffer compositeBuffer;
+
+        private final BufferHeader bufferHeader;

Review Comment:
   Why do we need this?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer implements Buffer {
+
+        private final long fileOffset;
+
+        private final CompositeBuffer compositeBuffer;
+
+        private final BufferHeader bufferHeader;
+
+        public PartialBuffer(
+                long fileOffset, CompositeBuffer compositeBuffer, BufferHeader 
bufferHeader) {
+            checkArgument(fileOffset >= 0);
+            this.fileOffset = fileOffset;
+            this.compositeBuffer = compositeBuffer;
+            this.bufferHeader = bufferHeader;
+        }
+
+        /**
+         * Returns the underlying file offset. Note that the file offset 
includes the length of the
+         * partial buffer.
+         */
+        public long getFileOffset() {
+            return fileOffset;
+        }
+
+        public CompositeBuffer getCompositeBuffer() {
+            return compositeBuffer;
+        }
+
+        public BufferHeader getBufferHeader() {
+            return bufferHeader;
+        }
+
+        @Override
+        public boolean isBuffer() {
+            return compositeBuffer.isBuffer();
+        }
+
+        @Override
+        public MemorySegment getMemorySegment() {
+            throw new UnsupportedOperationException();

Review Comment:
   For all these unsupported operations, we need to understand why they are 
unsupported. Is it because they cannot be supported by partial buffer? Or is it 
simply because we don't need them at the moment. For the latter, we'd better 
add a message so that people know they can be added in future if needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer implements Buffer {
+
+        private final long fileOffset;

Review Comment:
   We should not need this `fileOffset`. Instead, we can derive the offset from 
the beginning offset of the buffer and the length of the partial buffer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public List<Buffer> readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable PartialBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
+
+        List<Buffer> readBuffers = new LinkedList<>();
         Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
                 Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, 
partialBuffer, true);

Review Comment:
   The cache should not be aware of the partial buffer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -338,6 +338,8 @@ private class ScheduledSubpartitionReader implements 
Comparable<ScheduledSubpart
 
         private boolean isFailed;
 
+        private PartitionFileReader.PartialBuffer partialBuffer;

Review Comment:
   Why is this a field?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -354,34 +356,38 @@ private void loadDiskDataToBuffers(Queue<MemorySegment> 
buffers, BufferRecycler
                                 + subpartitionId
                                 + " has already been failed.");
             }
-            while (!buffers.isEmpty()
-                    && nettyConnectionWriter.numQueuedBuffers() < 
maxBufferReadAhead
-                    && nextSegmentId >= 0) {
-                MemorySegment memorySegment = buffers.poll();
-                Buffer buffer;
-                try {
-                    if ((buffer =
-                                    partitionFileReader.readBuffer(
-                                            partitionId,
-                                            subpartitionId,
-                                            nextSegmentId,
-                                            nextBufferIndex,
-                                            memorySegment,
-                                            recycler))
-                            == null) {
-                        buffers.add(memorySegment);
+
+            boolean hasRegionFinishedRead = false;
+            checkState(partialBuffer == null);
+            try {
+                while (!buffers.isEmpty() && !hasRegionFinishedRead && 
nextSegmentId >= 0) {
+                    MemorySegment memorySegment = buffers.poll();
+
+                    List<Buffer> readBuffers =
+                            readBuffersFromFileReader(buffers, recycler, 
memorySegment);
+                    if (readBuffers == null) {
                         break;
                     }
-                } catch (Throwable throwable) {
-                    buffers.add(memorySegment);
-                    throw throwable;
+
+                    partialBuffer = null;
+                    for (int i = 0; i < readBuffers.size(); i++) {
+                        Buffer readBuffer = readBuffers.get(i);
+                        if (i == readBuffers.size() - 1) {
+                            if (readBuffer instanceof 
PartitionFileReader.PartialBuffer) {
+                                partialBuffer = 
(PartitionFileReader.PartialBuffer) readBuffer;
+                                continue;
+                            } else {
+                                hasRegionFinishedRead = true;
+                                checkState(partialBuffer == null);
+                            }

Review Comment:
   Why is this?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
         }
     }
 
+    /**
+     * Slice the read memory segment to multiple small network buffers.
+     *
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
+     *
+     * @param byteBuffer the byte buffer to be sliced
+     * @param buffer the network buffer actually shares the same memory 
segment with byteBuffer.
+     *     This argument is only to call the method 
NetworkBuffer#readOnlySlice to read a slice of a
+     *     memory segment
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the partial data buffer, the second field is 
the partial buffer's
+     *     header, indicating the actual length of the partial buffer, the 
third field is the number
+     *     of sliced bytes.
+     */
+    private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            NetworkBuffer buffer,
+            @Nullable PartialBuffer partialBuffer,
+            List<Buffer> readBuffers) {
+        BufferHeader header = partialBuffer == null ? null : 
partialBuffer.getBufferHeader();
+        CompositeBuffer slicedBuffer =
+                partialBuffer == null ? null : 
partialBuffer.getCompositeBuffer();
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(header != null || slicedBuffer == null);
+        checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+        int numSlicedBytes = 0;
+        while (byteBuffer.hasRemaining()) {
+            // Parse the small buffer's header
+            if (header == null) {
+                if ((header = parseBufferHeader(byteBuffer, 
reusedHeaderBuffer)) == null) {
+                    break;

Review Comment:
   Need to explain what does this break mean.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
         }
     }
 
+    /**
+     * Slice the read memory segment to multiple small network buffers.
+     *
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
+     *
+     * @param byteBuffer the byte buffer to be sliced
+     * @param buffer the network buffer actually shares the same memory 
segment with byteBuffer.
+     *     This argument is only to call the method 
NetworkBuffer#readOnlySlice to read a slice of a
+     *     memory segment
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the partial data buffer, the second field is 
the partial buffer's
+     *     header, indicating the actual length of the partial buffer, the 
third field is the number
+     *     of sliced bytes.
+     */
+    private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            NetworkBuffer buffer,
+            @Nullable PartialBuffer partialBuffer,
+            List<Buffer> readBuffers) {
+        BufferHeader header = partialBuffer == null ? null : 
partialBuffer.getBufferHeader();
+        CompositeBuffer slicedBuffer =
+                partialBuffer == null ? null : 
partialBuffer.getCompositeBuffer();
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(header != null || slicedBuffer == null);
+        checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+        int numSlicedBytes = 0;
+        while (byteBuffer.hasRemaining()) {

Review Comment:
   The logic in the loop seems correct, but is hard to understand.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -399,8 +405,48 @@ private void prepareForScheduling() {
             priority =
                     nextSegmentId < 0
                             ? Long.MAX_VALUE
-                            : partitionFileReader.getPriority(
-                                    partitionId, subpartitionId, 
nextSegmentId, nextBufferIndex);
+                            : partialBuffer != null

Review Comment:
   How could this happen?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -237,15 +471,18 @@ private long getFileOffset() {
          * Updates the {@link BufferOffsetCache} upon the retrieval of a 
buffer from the file using
          * the file offset in the {@link BufferOffsetCache}.
          *
-         * @param bufferSize denotes the size of the buffer.
+         * @param numBuffers denotes the number of the advanced buffers.
          * @return return true if there are remaining buffers in the region, 
otherwise return false.
          */
-        private boolean advance(long bufferSize) {
-            nextBufferIndex++;
-            fileOffset += bufferSize;
+        private boolean advanceBuffers(int numBuffers) {
+            nextBufferIndex += numBuffers;
             return nextBufferIndex < (region.getFirstBufferIndex() + 
region.getNumBuffers());
         }
 
+        private void setReadOffset(long newFileOffset) {
+            fileOffset = newFileOffset;
+        }

Review Comment:
   These two can be combined.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public List<Buffer> readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable PartialBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
+
+        List<Buffer> readBuffers = new LinkedList<>();
         Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
                 Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, 
partialBuffer, true);
         if (!cache.isPresent()) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
+
+        // Get the read offset, including the start offset, the end offset
+        long readStartOffset =
+                partialBuffer == null ? cache.get().getFileOffset() : 
partialBuffer.getFileOffset();
+        long readEndOffset = cache.get().region.getRegionFileEndOffset();
+        checkState(readStartOffset <= readEndOffset);
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+        if (numBytesToRead == 0) {
+            return null;
+        }
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        int numFullBuffers;
+        int numBytesRealRead;
+        int numBytesPartial;
+        boolean noMoreDataInRegion = false;
+        try {
+            // Slice the read memory segment to multiple small network buffers 
and add them to
+            // readBuffers
+            Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+                    sliceBuffer(byteBuffer, buffer, partialBuffer, 
readBuffers);
+            numFullBuffers = readBuffers.size();
+            numBytesRealRead = partial.f2;
+            checkState(
+                    numBytesRealRead <= numBytesToRead
+                            && numBytesToRead - numBytesRealRead < 
HEADER_LENGTH);
+            if (readStartOffset + numBytesRealRead < readEndOffset) {
+                // If the region is not finished read, generate a partial 
buffer to store the
+                // partial data, then append the partial buffer to the tail of 
readBuffers

Review Comment:
   This doesn't make sense. Region not finished doesn't necessarily mean 
there's a partial buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to