TanYuxin-tyx commented on code in PR #22833: URL: https://github.com/apache/flink/pull/22833#discussion_r1241083798
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferContainer.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import org.apache.commons.lang3.tuple.Pair; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The buffer container for accumulating the records into {@link Buffer}s. After accumulating, the + * {@link SortBufferAccumulator} will read the sorted buffers. + */ +public class SortBufferContainer { + + /** + * Size of an index entry: 4 bytes for record length, 4 bytes for data type and 8 bytes for + * pointer to next entry. + */ + private static final int INDEX_ENTRY_SIZE = 4 + 4 + 8; + + /** A list of {@link MemorySegment}s used to store data in memory. */ + private final LinkedList<MemorySegment> freeSegments; + + /** A segment list as a joint buffer which stores all records and index entries. */ + private final ArrayList<MemorySegment> dataSegments; + + /** {@link BufferRecycler} used to recycle {@link #freeSegments}. */ + private final BufferRecycler bufferRecycler; + + /** Addresses of the first record's index entry for each subpartition. */ + private final long[] subpartitionFirstBufferIndexEntries; + + /** Addresses of the last record's index entry for each subpartition. */ + private final long[] subpartitionLastBufferIndexEntries; + + /** Size of buffers requested from buffer pool. All buffers must be of the same size. */ + private final int bufferSizeBytes; + + /** Number of guaranteed buffers can be allocated from the buffer pool for data sort. */ + private final int numBuffersForSort; + + // ------------------------------------------------------------------------ + // The statistics and states + // ------------------------------------------------------------------------ + + /** Total number of bytes already appended to this sort buffer. */ + private long numTotalBytes; + + /** Total number of bytes already read from this sort buffer. */ + private long numTotalBytesRead; + + /** Whether this sort buffer is finished. One can only read a finished sort buffer. */ + private boolean isFinished; + + /** Whether this sort buffer is released. A released sort buffer can not be used. */ + private boolean isReleased; + + // ------------------------------------------------------------------------ + // For writing + // ------------------------------------------------------------------------ + + /** Array index in the segment list of the current available buffer for writing. */ + private int writeBufferIndex; + + /** Next position in the current available buffer for writing. */ + private int writeOffsetInCurrentBuffer; + + // ------------------------------------------------------------------------ + // For reading + // ------------------------------------------------------------------------ + + /** Index entry address of the current record or event to be read. */ + private long readBufferIndexEntry; + + /** + * Record the bytes remaining after the last read, which must be initialized before reading a + * new record. + */ + private int recordRemainingBytesToRead; + + /** The subpartition that is reading data from. */ + private int readingSubpartitionId = -1; + + SortBufferContainer( + LinkedList<MemorySegment> freeSegments, + BufferRecycler bufferRecycler, + int numSubpartitions, + int bufferSizeBytes, + int numBuffersForSort) { + checkArgument(bufferSizeBytes > INDEX_ENTRY_SIZE, "Buffer size is too small."); + checkArgument(numBuffersForSort > 0, "No guaranteed buffers for sort."); + checkState(numBuffersForSort <= freeSegments.size(), "Wrong number of free segments."); + + this.freeSegments = checkNotNull(freeSegments); + this.bufferRecycler = checkNotNull(bufferRecycler); + this.bufferSizeBytes = bufferSizeBytes; + this.numBuffersForSort = numBuffersForSort; + this.dataSegments = new ArrayList<>(); + this.subpartitionFirstBufferIndexEntries = new long[numSubpartitions]; + this.subpartitionLastBufferIndexEntries = new long[numSubpartitions]; + + Arrays.fill(subpartitionFirstBufferIndexEntries, -1L); + Arrays.fill(subpartitionLastBufferIndexEntries, -1L); + } + + // ------------------------------------------------------------------------ + // Called by SortBufferAccumulator + // ------------------------------------------------------------------------ + + /** + * Note that no partial records will be written to this {@link SortBufferContainer}, which means + * that either all data of target record will be written or nothing will be written. + * + * @param record the record to be written + * @param subpartitionId the subpartition id + * @param dataType the data type of the record + * @return true if the {@link SortBufferContainer} is full, or return false if the contianer is + * not full Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
