TanYuxin-tyx commented on code in PR #22833:
URL: https://github.com/apache/flink/pull/22833#discussion_r1241083798


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferContainer.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The buffer container for accumulating the records into {@link Buffer}s. 
After accumulating, the
+ * {@link SortBufferAccumulator} will read the sorted buffers.
+ */
+public class SortBufferContainer {
+
+    /**
+     * Size of an index entry: 4 bytes for record length, 4 bytes for data 
type and 8 bytes for
+     * pointer to next entry.
+     */
+    private static final int INDEX_ENTRY_SIZE = 4 + 4 + 8;
+
+    /** A list of {@link MemorySegment}s used to store data in memory. */
+    private final LinkedList<MemorySegment> freeSegments;
+
+    /** A segment list as a joint buffer which stores all records and index 
entries. */
+    private final ArrayList<MemorySegment> dataSegments;
+
+    /** {@link BufferRecycler} used to recycle {@link #freeSegments}. */
+    private final BufferRecycler bufferRecycler;
+
+    /** Addresses of the first record's index entry for each subpartition. */
+    private final long[] subpartitionFirstBufferIndexEntries;
+
+    /** Addresses of the last record's index entry for each subpartition. */
+    private final long[] subpartitionLastBufferIndexEntries;
+
+    /** Size of buffers requested from buffer pool. All buffers must be of the 
same size. */
+    private final int bufferSizeBytes;
+
+    /** Number of guaranteed buffers can be allocated from the buffer pool for 
data sort. */
+    private final int numBuffersForSort;
+
+    // ------------------------------------------------------------------------
+    // The statistics and states
+    // ------------------------------------------------------------------------
+
+    /** Total number of bytes already appended to this sort buffer. */
+    private long numTotalBytes;
+
+    /** Total number of bytes already read from this sort buffer. */
+    private long numTotalBytesRead;
+
+    /** Whether this sort buffer is finished. One can only read a finished 
sort buffer. */
+    private boolean isFinished;
+
+    /** Whether this sort buffer is released. A released sort buffer can not 
be used. */
+    private boolean isReleased;
+
+    // ------------------------------------------------------------------------
+    // For writing
+    // ------------------------------------------------------------------------
+
+    /** Array index in the segment list of the current available buffer for 
writing. */
+    private int writeBufferIndex;
+
+    /** Next position in the current available buffer for writing. */
+    private int writeOffsetInCurrentBuffer;
+
+    // ------------------------------------------------------------------------
+    // For reading
+    // ------------------------------------------------------------------------
+
+    /** Index entry address of the current record or event to be read. */
+    private long readBufferIndexEntry;
+
+    /**
+     * Record the bytes remaining after the last read, which must be 
initialized before reading a
+     * new record.
+     */
+    private int recordRemainingBytesToRead;
+
+    /** The subpartition that is reading data from. */
+    private int readingSubpartitionId = -1;
+
+    SortBufferContainer(
+            LinkedList<MemorySegment> freeSegments,
+            BufferRecycler bufferRecycler,
+            int numSubpartitions,
+            int bufferSizeBytes,
+            int numBuffersForSort) {
+        checkArgument(bufferSizeBytes > INDEX_ENTRY_SIZE, "Buffer size is too 
small.");
+        checkArgument(numBuffersForSort > 0, "No guaranteed buffers for 
sort.");
+        checkState(numBuffersForSort <= freeSegments.size(), "Wrong number of 
free segments.");
+
+        this.freeSegments = checkNotNull(freeSegments);
+        this.bufferRecycler = checkNotNull(bufferRecycler);
+        this.bufferSizeBytes = bufferSizeBytes;
+        this.numBuffersForSort = numBuffersForSort;
+        this.dataSegments = new ArrayList<>();
+        this.subpartitionFirstBufferIndexEntries = new long[numSubpartitions];
+        this.subpartitionLastBufferIndexEntries = new long[numSubpartitions];
+
+        Arrays.fill(subpartitionFirstBufferIndexEntries, -1L);
+        Arrays.fill(subpartitionLastBufferIndexEntries, -1L);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by SortBufferAccumulator
+    // ------------------------------------------------------------------------
+
+    /**
+     * Note that no partial records will be written to this {@link 
SortBufferContainer}, which means
+     * that either all data of target record will be written or nothing will 
be written.
+     *
+     * @param record the record to be written
+     * @param subpartitionId the subpartition id
+     * @param dataType the data type of the record
+     * @return true if the {@link SortBufferContainer} is full, or return 
false if the contianer is
+     *     not full

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to