xintongsong commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1309895267
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -40,16 +49,20 @@ public interface PartitionFileReader {
* @param bufferIndex the index of buffer
* @param memorySegment the empty buffer to store the read buffer
* @param recycler the buffer recycler
+ * @param partialBuffer the previous partial buffer. The partial buffer is
not null only when
+ * the last read has a partial buffer, it will construct a full buffer
during the read
+ * process.
* @return null if there is no data otherwise a buffer.
*/
@Nullable
- Buffer readBuffer(
+ List<Buffer> readBuffer(
Review Comment:
We should also update JavaDoc w.r.t. the return-value change. And it should
no longer be `@Nullable`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
/** Release the {@link PartitionFileReader}. */
void release();
+
+ /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+ class PartialBuffer implements Buffer {
+
+ private final long fileOffset;
+
+ private final CompositeBuffer compositeBuffer;
Review Comment:
I wonder if we can simply make `PartialBuffer` extends `CompositeBuffer`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
if (!cache.isPresent()) {
return null;
}
- fileChannel.position(cache.get().getFileOffset());
- Buffer buffer =
- readFromByteChannel(fileChannel, reusedHeaderBuffer,
memorySegment, recycler);
- boolean hasNextBuffer =
- cache.get()
- .advance(
- checkNotNull(buffer).readableBytes()
- +
BufferReaderWriterUtil.HEADER_LENGTH);
- if (hasNextBuffer) {
- int nextBufferIndex = bufferIndex + 1;
- // TODO: introduce the LRU cache strategy in the future to
restrict the total
- // cache number. Testing to prevent cache leaks has been
implemented.
- if (numCaches < maxCacheNumber) {
- bufferOffsetCaches.put(Tuple2.of(subpartitionId,
nextBufferIndex), cache.get());
- numCaches++;
+
+ // Get the read offset, including the start offset, the end offset
+ long readStartOffset =
+ partialBuffer == null ? cache.get().getFileOffset() :
partialBuffer.getFileOffset();
+ long readEndOffset = cache.get().region.getRegionFileEndOffset();
+ checkState(readStartOffset <= readEndOffset);
+ int numBytesToRead =
+ Math.min(memorySegment.size(), (int) (readEndOffset -
readStartOffset));
+ if (numBytesToRead == 0) {
+ return null;
+ }
+ ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+ fileChannel.position(readStartOffset);
+
+ // Read data to the memory segment, note the read size is
numBytesToRead
+ readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
Review Comment:
Why do we need to create the NetworkBuffer here and pass both NetworkBuffer
and ByteBuffer into sliceBuffer?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
if (!cache.isPresent()) {
return null;
}
- fileChannel.position(cache.get().getFileOffset());
- Buffer buffer =
- readFromByteChannel(fileChannel, reusedHeaderBuffer,
memorySegment, recycler);
- boolean hasNextBuffer =
- cache.get()
- .advance(
- checkNotNull(buffer).readableBytes()
- +
BufferReaderWriterUtil.HEADER_LENGTH);
- if (hasNextBuffer) {
- int nextBufferIndex = bufferIndex + 1;
- // TODO: introduce the LRU cache strategy in the future to
restrict the total
- // cache number. Testing to prevent cache leaks has been
implemented.
- if (numCaches < maxCacheNumber) {
- bufferOffsetCaches.put(Tuple2.of(subpartitionId,
nextBufferIndex), cache.get());
- numCaches++;
+
+ // Get the read offset, including the start offset, the end offset
+ long readStartOffset =
+ partialBuffer == null ? cache.get().getFileOffset() :
partialBuffer.getFileOffset();
+ long readEndOffset = cache.get().region.getRegionFileEndOffset();
+ checkState(readStartOffset <= readEndOffset);
+ int numBytesToRead =
+ Math.min(memorySegment.size(), (int) (readEndOffset -
readStartOffset));
+ if (numBytesToRead == 0) {
+ return null;
+ }
+ ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+ fileChannel.position(readStartOffset);
+
+ // Read data to the memory segment, note the read size is
numBytesToRead
+ readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+ buffer.setSize(byteBuffer.remaining());
+
+ int numFullBuffers;
+ int numBytesRealRead;
+ int numBytesPartial;
+ boolean noMoreDataInRegion = false;
+ try {
+ // Slice the read memory segment to multiple small network buffers
and add them to
+ // readBuffers
+ Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+ sliceBuffer(byteBuffer, buffer, partialBuffer,
readBuffers);
+ numFullBuffers = readBuffers.size();
+ numBytesRealRead = partial.f2;
+ checkState(
+ numBytesRealRead <= numBytesToRead
+ && numBytesToRead - numBytesRealRead <
HEADER_LENGTH);
+ if (readStartOffset + numBytesRealRead < readEndOffset) {
+ // If the region is not finished read, generate a partial
buffer to store the
+ // partial data, then append the partial buffer to the tail of
readBuffers
+ partialBuffer =
+ new PartialBuffer(
+ readStartOffset + numBytesRealRead,
partial.f0, partial.f1);
+ numBytesPartial =
+ partialBuffer.readableBytes()
+ + (partialBuffer.getBufferHeader() == null ? 0
: HEADER_LENGTH);
+ checkState(
+ partialBuffer.getBufferHeader() != null
+ || partialBuffer.getCompositeBuffer() == null);
+ readBuffers.add(partialBuffer);
+ } else {
+ // The region is read completely
+ checkState(partial.f0 == null);
+ noMoreDataInRegion = true;
Review Comment:
Why do we need this? It's never used.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -399,8 +405,48 @@ private void prepareForScheduling() {
priority =
nextSegmentId < 0
? Long.MAX_VALUE
- : partitionFileReader.getPriority(
- partitionId, subpartitionId,
nextSegmentId, nextBufferIndex);
+ : partialBuffer != null
+ ? partialBuffer.getFileOffset()
+ : partitionFileReader.getPriority(
+ partitionId,
+ subpartitionId,
+ nextSegmentId,
+ nextBufferIndex);
+ }
+
+ private List<Buffer> readBuffersFromFileReader(
Review Comment:
Why wrapping this as a method?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
/** Release the {@link PartitionFileReader}. */
void release();
+
+ /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+ class PartialBuffer implements Buffer {
+
+ private final long fileOffset;
+
+ private final CompositeBuffer compositeBuffer;
+
+ private final BufferHeader bufferHeader;
+
+ public PartialBuffer(
+ long fileOffset, CompositeBuffer compositeBuffer, BufferHeader
bufferHeader) {
+ checkArgument(fileOffset >= 0);
+ this.fileOffset = fileOffset;
+ this.compositeBuffer = compositeBuffer;
+ this.bufferHeader = bufferHeader;
+ }
+
+ /**
+ * Returns the underlying file offset. Note that the file offset
includes the length of the
+ * partial buffer.
+ */
+ public long getFileOffset() {
+ return fileOffset;
+ }
+
+ public CompositeBuffer getCompositeBuffer() {
+ return compositeBuffer;
+ }
+
+ public BufferHeader getBufferHeader() {
+ return bufferHeader;
+ }
+
+ @Override
+ public boolean isBuffer() {
+ return compositeBuffer.isBuffer();
+ }
+
+ @Override
+ public MemorySegment getMemorySegment() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMemorySegmentOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferRecycler getRecycler() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setRecycler(BufferRecycler bufferRecycler) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void recycleBuffer() {
+ if (compositeBuffer != null) {
+ compositeBuffer.recycleBuffer();
Review Comment:
How could `compositeBuffer` be null? Do we allow that at all?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
/** Release the {@link PartitionFileReader}. */
void release();
+
+ /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+ class PartialBuffer implements Buffer {
+
+ private final long fileOffset;
+
+ private final CompositeBuffer compositeBuffer;
+
+ private final BufferHeader bufferHeader;
Review Comment:
Why do we need this?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
/** Release the {@link PartitionFileReader}. */
void release();
+
+ /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+ class PartialBuffer implements Buffer {
+
+ private final long fileOffset;
+
+ private final CompositeBuffer compositeBuffer;
+
+ private final BufferHeader bufferHeader;
+
+ public PartialBuffer(
+ long fileOffset, CompositeBuffer compositeBuffer, BufferHeader
bufferHeader) {
+ checkArgument(fileOffset >= 0);
+ this.fileOffset = fileOffset;
+ this.compositeBuffer = compositeBuffer;
+ this.bufferHeader = bufferHeader;
+ }
+
+ /**
+ * Returns the underlying file offset. Note that the file offset
includes the length of the
+ * partial buffer.
+ */
+ public long getFileOffset() {
+ return fileOffset;
+ }
+
+ public CompositeBuffer getCompositeBuffer() {
+ return compositeBuffer;
+ }
+
+ public BufferHeader getBufferHeader() {
+ return bufferHeader;
+ }
+
+ @Override
+ public boolean isBuffer() {
+ return compositeBuffer.isBuffer();
+ }
+
+ @Override
+ public MemorySegment getMemorySegment() {
+ throw new UnsupportedOperationException();
Review Comment:
For all these unsupported operations, we need to understand why they are
unsupported. Is it because they cannot be supported by partial buffer? Or is it
simply because we don't need them at the moment. For the latter, we'd better
add a message so that people know they can be added in future if needed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +91,165 @@ long getPriority(
/** Release the {@link PartitionFileReader}. */
void release();
+
+ /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+ class PartialBuffer implements Buffer {
+
+ private final long fileOffset;
Review Comment:
We should not need this `fileOffset`. Instead, we can derive the offset from
the beginning offset of the buffer and the length of the partial buffer.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
Review Comment:
The cache should not be aware of the partial buffer.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -338,6 +338,8 @@ private class ScheduledSubpartitionReader implements
Comparable<ScheduledSubpart
private boolean isFailed;
+ private PartitionFileReader.PartialBuffer partialBuffer;
Review Comment:
Why is this a field?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -354,34 +356,38 @@ private void loadDiskDataToBuffers(Queue<MemorySegment>
buffers, BufferRecycler
+ subpartitionId
+ " has already been failed.");
}
- while (!buffers.isEmpty()
- && nettyConnectionWriter.numQueuedBuffers() <
maxBufferReadAhead
- && nextSegmentId >= 0) {
- MemorySegment memorySegment = buffers.poll();
- Buffer buffer;
- try {
- if ((buffer =
- partitionFileReader.readBuffer(
- partitionId,
- subpartitionId,
- nextSegmentId,
- nextBufferIndex,
- memorySegment,
- recycler))
- == null) {
- buffers.add(memorySegment);
+
+ boolean hasRegionFinishedRead = false;
+ checkState(partialBuffer == null);
+ try {
+ while (!buffers.isEmpty() && !hasRegionFinishedRead &&
nextSegmentId >= 0) {
+ MemorySegment memorySegment = buffers.poll();
+
+ List<Buffer> readBuffers =
+ readBuffersFromFileReader(buffers, recycler,
memorySegment);
+ if (readBuffers == null) {
break;
}
- } catch (Throwable throwable) {
- buffers.add(memorySegment);
- throw throwable;
+
+ partialBuffer = null;
+ for (int i = 0; i < readBuffers.size(); i++) {
+ Buffer readBuffer = readBuffers.get(i);
+ if (i == readBuffers.size() - 1) {
+ if (readBuffer instanceof
PartitionFileReader.PartialBuffer) {
+ partialBuffer =
(PartitionFileReader.PartialBuffer) readBuffer;
+ continue;
+ } else {
+ hasRegionFinishedRead = true;
+ checkState(partialBuffer == null);
+ }
Review Comment:
Why is this?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
}
}
+ /**
+ * Slice the read memory segment to multiple small network buffers.
+ *
+ * <p>Note that although the method appears to be split into multiple
buffers, the sliced
+ * buffers still share the same one actual underlying memory segment.
+ *
+ * @param byteBuffer the byte buffer to be sliced
+ * @param buffer the network buffer actually shares the same memory
segment with byteBuffer.
+ * This argument is only to call the method
NetworkBuffer#readOnlySlice to read a slice of a
+ * memory segment
+ * @param partialBuffer the partial buffer, if the partial buffer is not
null, it contains the
+ * partial data buffer from the previous read
+ * @param readBuffers the read buffers list is to accept the sliced buffers
+ * @return the first field is the partial data buffer, the second field is
the partial buffer's
+ * header, indicating the actual length of the partial buffer, the
third field is the number
+ * of sliced bytes.
+ */
+ private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+ ByteBuffer byteBuffer,
+ NetworkBuffer buffer,
+ @Nullable PartialBuffer partialBuffer,
+ List<Buffer> readBuffers) {
+ BufferHeader header = partialBuffer == null ? null :
partialBuffer.getBufferHeader();
+ CompositeBuffer slicedBuffer =
+ partialBuffer == null ? null :
partialBuffer.getCompositeBuffer();
+ checkState(reusedHeaderBuffer.position() == 0);
+ checkState(header != null || slicedBuffer == null);
+ checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+ int numSlicedBytes = 0;
+ while (byteBuffer.hasRemaining()) {
+ // Parse the small buffer's header
+ if (header == null) {
+ if ((header = parseBufferHeader(byteBuffer,
reusedHeaderBuffer)) == null) {
+ break;
Review Comment:
Need to explain what does this break mean.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
}
}
+ /**
+ * Slice the read memory segment to multiple small network buffers.
+ *
+ * <p>Note that although the method appears to be split into multiple
buffers, the sliced
+ * buffers still share the same one actual underlying memory segment.
+ *
+ * @param byteBuffer the byte buffer to be sliced
+ * @param buffer the network buffer actually shares the same memory
segment with byteBuffer.
+ * This argument is only to call the method
NetworkBuffer#readOnlySlice to read a slice of a
+ * memory segment
+ * @param partialBuffer the partial buffer, if the partial buffer is not
null, it contains the
+ * partial data buffer from the previous read
+ * @param readBuffers the read buffers list is to accept the sliced buffers
+ * @return the first field is the partial data buffer, the second field is
the partial buffer's
+ * header, indicating the actual length of the partial buffer, the
third field is the number
+ * of sliced bytes.
+ */
+ private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+ ByteBuffer byteBuffer,
+ NetworkBuffer buffer,
+ @Nullable PartialBuffer partialBuffer,
+ List<Buffer> readBuffers) {
+ BufferHeader header = partialBuffer == null ? null :
partialBuffer.getBufferHeader();
+ CompositeBuffer slicedBuffer =
+ partialBuffer == null ? null :
partialBuffer.getCompositeBuffer();
+ checkState(reusedHeaderBuffer.position() == 0);
+ checkState(header != null || slicedBuffer == null);
+ checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+ int numSlicedBytes = 0;
+ while (byteBuffer.hasRemaining()) {
Review Comment:
The logic in the loop seems correct, but is hard to understand.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -399,8 +405,48 @@ private void prepareForScheduling() {
priority =
nextSegmentId < 0
? Long.MAX_VALUE
- : partitionFileReader.getPriority(
- partitionId, subpartitionId,
nextSegmentId, nextBufferIndex);
+ : partialBuffer != null
Review Comment:
How could this happen?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -237,15 +471,18 @@ private long getFileOffset() {
* Updates the {@link BufferOffsetCache} upon the retrieval of a
buffer from the file using
* the file offset in the {@link BufferOffsetCache}.
*
- * @param bufferSize denotes the size of the buffer.
+ * @param numBuffers denotes the number of the advanced buffers.
* @return return true if there are remaining buffers in the region,
otherwise return false.
*/
- private boolean advance(long bufferSize) {
- nextBufferIndex++;
- fileOffset += bufferSize;
+ private boolean advanceBuffers(int numBuffers) {
+ nextBufferIndex += numBuffers;
return nextBufferIndex < (region.getFirstBufferIndex() +
region.getNumBuffers());
}
+ private void setReadOffset(long newFileOffset) {
+ fileOffset = newFileOffset;
+ }
Review Comment:
These two can be combined.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -99,40 +115,96 @@ public class ProducerMergedPartitionFileReader implements
PartitionFileReader {
}
@Override
- public Buffer readBuffer(
+ public List<Buffer> readBuffer(
TieredStoragePartitionId partitionId,
TieredStorageSubpartitionId subpartitionId,
int segmentId,
int bufferIndex,
MemorySegment memorySegment,
- BufferRecycler recycler)
+ BufferRecycler recycler,
+ @Nullable PartialBuffer partialBuffer)
throws IOException {
lazyInitializeFileChannel();
+
+ List<Buffer> readBuffers = new LinkedList<>();
Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
Tuple2.of(subpartitionId, bufferIndex);
- Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
+ Optional<BufferOffsetCache> cache = tryGetCache(cacheKey,
partialBuffer, true);
if (!cache.isPresent()) {
return null;
}
- fileChannel.position(cache.get().getFileOffset());
- Buffer buffer =
- readFromByteChannel(fileChannel, reusedHeaderBuffer,
memorySegment, recycler);
- boolean hasNextBuffer =
- cache.get()
- .advance(
- checkNotNull(buffer).readableBytes()
- +
BufferReaderWriterUtil.HEADER_LENGTH);
- if (hasNextBuffer) {
- int nextBufferIndex = bufferIndex + 1;
- // TODO: introduce the LRU cache strategy in the future to
restrict the total
- // cache number. Testing to prevent cache leaks has been
implemented.
- if (numCaches < maxCacheNumber) {
- bufferOffsetCaches.put(Tuple2.of(subpartitionId,
nextBufferIndex), cache.get());
- numCaches++;
+
+ // Get the read offset, including the start offset, the end offset
+ long readStartOffset =
+ partialBuffer == null ? cache.get().getFileOffset() :
partialBuffer.getFileOffset();
+ long readEndOffset = cache.get().region.getRegionFileEndOffset();
+ checkState(readStartOffset <= readEndOffset);
+ int numBytesToRead =
+ Math.min(memorySegment.size(), (int) (readEndOffset -
readStartOffset));
+ if (numBytesToRead == 0) {
+ return null;
+ }
+ ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+ fileChannel.position(readStartOffset);
+
+ // Read data to the memory segment, note the read size is
numBytesToRead
+ readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment, recycler);
+ buffer.setSize(byteBuffer.remaining());
+
+ int numFullBuffers;
+ int numBytesRealRead;
+ int numBytesPartial;
+ boolean noMoreDataInRegion = false;
+ try {
+ // Slice the read memory segment to multiple small network buffers
and add them to
+ // readBuffers
+ Tuple3<CompositeBuffer, BufferHeader, Integer> partial =
+ sliceBuffer(byteBuffer, buffer, partialBuffer,
readBuffers);
+ numFullBuffers = readBuffers.size();
+ numBytesRealRead = partial.f2;
+ checkState(
+ numBytesRealRead <= numBytesToRead
+ && numBytesToRead - numBytesRealRead <
HEADER_LENGTH);
+ if (readStartOffset + numBytesRealRead < readEndOffset) {
+ // If the region is not finished read, generate a partial
buffer to store the
+ // partial data, then append the partial buffer to the tail of
readBuffers
Review Comment:
This doesn't make sense. Region not finished doesn't necessarily mean
there's a partial buffer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]