TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1312876410
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
}
}
+ /**
+ * Slice the read memory segment to multiple small network buffers.
+ *
+ * <p>Note that although the method appears to be split into multiple
buffers, the sliced
+ * buffers still share the same one actual underlying memory segment.
+ *
+ * @param byteBuffer the byte buffer to be sliced
+ * @param buffer the network buffer actually shares the same memory
segment with byteBuffer.
+ * This argument is only to call the method
NetworkBuffer#readOnlySlice to read a slice of a
+ * memory segment
+ * @param partialBuffer the partial buffer, if the partial buffer is not
null, it contains the
+ * partial data buffer from the previous read
+ * @param readBuffers the read buffers list is to accept the sliced buffers
+ * @return the first field is the partial data buffer, the second field is
the partial buffer's
+ * header, indicating the actual length of the partial buffer, the
third field is the number
+ * of sliced bytes.
+ */
+ private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+ ByteBuffer byteBuffer,
+ NetworkBuffer buffer,
+ @Nullable PartialBuffer partialBuffer,
+ List<Buffer> readBuffers) {
+ BufferHeader header = partialBuffer == null ? null :
partialBuffer.getBufferHeader();
+ CompositeBuffer slicedBuffer =
+ partialBuffer == null ? null :
partialBuffer.getCompositeBuffer();
+ checkState(reusedHeaderBuffer.position() == 0);
+ checkState(header != null || slicedBuffer == null);
+ checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+ int numSlicedBytes = 0;
+ while (byteBuffer.hasRemaining()) {
Review Comment:
Refatored the method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -204,6 +279,159 @@ private Optional<BufferOffsetCache> tryGetCache(
}
}
+ /**
+ * Slice the read memory segment to multiple small network buffers.
+ *
+ * <p>Note that although the method appears to be split into multiple
buffers, the sliced
+ * buffers still share the same one actual underlying memory segment.
+ *
+ * @param byteBuffer the byte buffer to be sliced
+ * @param buffer the network buffer actually shares the same memory
segment with byteBuffer.
+ * This argument is only to call the method
NetworkBuffer#readOnlySlice to read a slice of a
+ * memory segment
+ * @param partialBuffer the partial buffer, if the partial buffer is not
null, it contains the
+ * partial data buffer from the previous read
+ * @param readBuffers the read buffers list is to accept the sliced buffers
+ * @return the first field is the partial data buffer, the second field is
the partial buffer's
+ * header, indicating the actual length of the partial buffer, the
third field is the number
+ * of sliced bytes.
+ */
+ private Tuple3<CompositeBuffer, BufferHeader, Integer> sliceBuffer(
+ ByteBuffer byteBuffer,
+ NetworkBuffer buffer,
+ @Nullable PartialBuffer partialBuffer,
+ List<Buffer> readBuffers) {
+ BufferHeader header = partialBuffer == null ? null :
partialBuffer.getBufferHeader();
+ CompositeBuffer slicedBuffer =
+ partialBuffer == null ? null :
partialBuffer.getCompositeBuffer();
+ checkState(reusedHeaderBuffer.position() == 0);
+ checkState(header != null || slicedBuffer == null);
+ checkState(slicedBuffer == null || slicedBuffer.missingLength() > 0);
+
+ int numSlicedBytes = 0;
+ while (byteBuffer.hasRemaining()) {
Review Comment:
Refactored the method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]