TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1312876023
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
if (!cache.isPresent()) {
return null;
}
- fileChannel.position(cache.get().getFileOffset());
- Buffer buffer =
- readFromByteChannel(fileChannel, reusedHeaderBuffer,
memorySegment, recycler);
- boolean hasNextBuffer =
- cache.get()
- .advance(
- checkNotNull(buffer).readableBytes()
- +
BufferReaderWriterUtil.HEADER_LENGTH);
- if (hasNextBuffer) {
- int nextBufferIndex = bufferIndex + 1;
- // TODO: introduce the LRU cache strategy in the future to
restrict the total
- // cache number. Testing to prevent cache leaks has been
implemented.
- if (numCaches < maxCacheNumber) {
- bufferOffsetCaches.put(Tuple2.of(subpartitionId,
nextBufferIndex), cache.get());
- numCaches++;
+
+ // Get the read offset, including the start offset, the end offset
+ long readStartOffset =
+ partialBuffer == null ? cache.get().getFileOffset() :
partialBuffer.getFileOffset();
+ long readEndOffset = cache.get().region.getRegionFileEndOffset();
+ checkState(readStartOffset <= readEndOffset);
+ int numBytesToRead =
+ Math.min(memorySegment.size(), (int) (readEndOffset -
readStartOffset));
+ if (numBytesToRead == 0) {
+ return null;
+ }
+ ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+ fileChannel.position(readStartOffset);
+
+ // Read data to the memory segment, note the read size is
numBytesToRead
+ readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+ buffer.setSize(byteBuffer.remaining());
+
+ int numFullBuffers;
+ int numBytesRealRead;
+ int numBytesPartial;
+ boolean noMoreDataInRegion = false;
+ try {
+ // Slice the read memory segment to multiple small network buffers
and add them to
+ // readBuffers
+ Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+ sliceBuffer(byteBuffer, buffer, partialBuffer,
readBuffers);
+ numFullBuffers = readBuffers.size();
+ numBytesRealRead = partial.f2;
+ checkState(
+ numBytesRealRead <= numBytesToRead
+ && numBytesToRead - numBytesRealRead <
HEADER_LENGTH);
+ if (readStartOffset + numBytesRealRead < readEndOffset) {
+ // If the region is not finished read, generate a partial
buffer to store the
+ // partial data, then append the partial buffer to the tail of
readBuffers
Review Comment:
I introduced a new return value of `readBuffers` to indicate whether the
region has been finished reading.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]