TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1319722535
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -176,91 +162,179 @@ private void lazyInitializeFileChannel() {
}
/**
- * Try to get the cache according to the key.
+ * Slice the read memory segment to multiple small network buffers.
*
- * <p>If the relevant buffer offset cache exists, it will be returned and
subsequently removed.
- * However, if the buffer offset cache does not exist, a new cache will be
created using the
- * data index and returned.
+ * <p>Note that although the method appears to be split into multiple
buffers, the sliced
+ * buffers still share the same one actual underlying memory segment.
*
- * @param cacheKey the key of cache.
- * @param removeKey boolean decides whether to remove key.
- * @return returns the relevant buffer offset cache if it exists,
otherwise return {@link
- * Optional#empty()}.
+ * @param byteBuffer the byte buffer to be sliced, it points to the
underlying memorySegment
+ * @param memorySegment the underlying memory segment to be sliced
+ * @param partialBuffer the partial buffer, if the partial buffer is not
null, it contains the
+ * partial data buffer from the previous read
+ * @param readBuffers the read buffers list is to accept the sliced buffers
+ * @return the first field is the partial data buffer, the second field is
the number of sliced
+ * bytes.
*/
- private Optional<BufferOffsetCache> tryGetCache(
- Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean
removeKey) {
- BufferOffsetCache bufferOffsetCache =
bufferOffsetCaches.remove(cacheKey);
- if (bufferOffsetCache == null) {
- Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion>
regionOpt =
- dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
- return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1,
region));
- } else {
- if (removeKey) {
- numCaches--;
- } else {
- bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+ private Tuple2<PartialBuffer, Integer> sliceBuffer(
+ ByteBuffer byteBuffer,
+ MemorySegment memorySegment,
+ @Nullable PartialBuffer partialBuffer,
+ BufferRecycler bufferRecycler,
+ List<Buffer> readBuffers) {
+ checkState(reusedHeaderBuffer.position() == 0);
+ checkState(partialBuffer == null || partialBuffer.missingLength() > 0);
+
+ NetworkBuffer buffer = new NetworkBuffer(memorySegment,
bufferRecycler);
+ buffer.setSize(byteBuffer.remaining());
+
+ try {
+ int numSlicedBytes = 0;
+ if (partialBuffer != null) {
+ // If there is a previous small partial buffer, the current
read operation should
+ // read additional data and combine it with the existing
partial to construct a new
+ // complete buffer
+ buffer.retainBuffer();
+ int position = byteBuffer.position() +
partialBuffer.missingLength();
+ int numPartialBytes = partialBuffer.missingLength();
+ partialBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
numPartialBytes));
+ numSlicedBytes += numPartialBytes;
+ byteBuffer.position(position);
+ readBuffers.add(partialBuffer);
}
- return Optional.of(bufferOffsetCache);
+
+ partialBuffer = null;
+ while (byteBuffer.hasRemaining()) {
+ // Parse the small buffer's header
+ BufferHeader header = parseBufferHeader(byteBuffer);
+ if (header == null) {
+ // If the remaining data length in the buffer is not
enough to construct a new
+ // complete buffer header, drop it directly.
+ break;
+ } else {
+ numSlicedBytes += HEADER_LENGTH;
+ }
+
+ if (header.getLength() <= byteBuffer.remaining()) {
+ // The remaining data length in the buffer is enough to
generate a new small
+ // sliced network buffer. The small sliced buffer is not a
partial buffer, we
+ // should read the slice of the buffer directly
+ buffer.retainBuffer();
+ CompositeBuffer slicedBuffer = new CompositeBuffer(header);
Review Comment:
I use the `ReadOnlySlicedNetworkBuffer` instead.
I found `ReadOnlySlicedNetworkBuffer` already has the method of
`setDataType` and `setCompressed`, so we need not add a new constructor for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]