TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1312876023


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public List<Buffer> readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable PartialBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
+
+        List<Buffer> readBuffers = new LinkedList<>();
         Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
                 Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, 
partialBuffer, true);
         if (!cache.isPresent()) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
+
+        // Get the read offset, including the start offset, the end offset
+        long readStartOffset =
+                partialBuffer == null ? cache.get().getFileOffset() : 
partialBuffer.getFileOffset();
+        long readEndOffset = cache.get().region.getRegionFileEndOffset();
+        checkState(readStartOffset <= readEndOffset);
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+        if (numBytesToRead == 0) {
+            return null;
+        }
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        int numFullBuffers;
+        int numBytesRealRead;
+        int numBytesPartial;
+        boolean noMoreDataInRegion = false;
+        try {
+            // Slice the read memory segment to multiple small network buffers 
and add them to
+            // readBuffers
+            Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+                    sliceBuffer(byteBuffer, buffer, partialBuffer, 
readBuffers);
+            numFullBuffers = readBuffers.size();
+            numBytesRealRead = partial.f2;
+            checkState(
+                    numBytesRealRead <= numBytesToRead
+                            && numBytesToRead - numBytesRealRead < 
HEADER_LENGTH);
+            if (readStartOffset + numBytesRealRead < readEndOffset) {
+                // If the region is not finished read, generate a partial 
buffer to store the
+                // partial data, then append the partial buffer to the tail of 
readBuffers

Review Comment:
   I introduced a new return value of `readBuffers` to indicate whether the 
region has been finished reading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to