TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1312875141
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
if (!cache.isPresent()) {
return null;
}
- fileChannel.position(cache.get().getFileOffset());
- Buffer buffer =
- readFromByteChannel(fileChannel, reusedHeaderBuffer,
memorySegment, recycler);
- boolean hasNextBuffer =
- cache.get()
- .advance(
- checkNotNull(buffer).readableBytes()
- +
BufferReaderWriterUtil.HEADER_LENGTH);
- if (hasNextBuffer) {
- int nextBufferIndex = bufferIndex + 1;
- // TODO: introduce the LRU cache strategy in the future to
restrict the total
- // cache number. Testing to prevent cache leaks has been
implemented.
- if (numCaches < maxCacheNumber) {
- bufferOffsetCaches.put(Tuple2.of(subpartitionId,
nextBufferIndex), cache.get());
- numCaches++;
+
+ // Get the read offset, including the start offset, the end offset
+ long readStartOffset =
+ partialBuffer == null ? cache.get().getFileOffset() :
partialBuffer.getFileOffset();
+ long readEndOffset = cache.get().region.getRegionFileEndOffset();
+ checkState(readStartOffset <= readEndOffset);
+ int numBytesToRead =
+ Math.min(memorySegment.size(), (int) (readEndOffset -
readStartOffset));
+ if (numBytesToRead == 0) {
+ return null;
+ }
+ ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+ fileChannel.position(readStartOffset);
+
+ // Read data to the memory segment, note the read size is
numBytesToRead
+ readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
Review Comment:
Moved the `NetworkBuffer ` into the `sliceBuffer`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]