gaoyunhaii commented on a change in pull request #18505:
URL: https://github.com/apache/flink/pull/18505#discussion_r801233645



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -365,16 +392,17 @@ private void updateStatistics(Buffer buffer, boolean 
isBroadcast) {
     private void writeLargeRecord(
             ByteBuffer record, int targetSubpartition, DataType dataType, 
boolean isBroadcast)
             throws IOException {
+        checkState(numBuffersForWrite > 0, "No buffers available for 
writing.");

Review comment:
       Would this cause problem if there is large records when using hash-based 
implementation? Might we keep at least one buffer for write? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/HashBasedPartitionSortedBuffer.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * * A {@link SortBuffer} implementation which sorts all appended records only 
by subpartition
+ * index. Records of the same subpartition keep the appended order.
+ *
+ * <p>Different from the {@link SortBasedPartitionSortedBuffer}, in this 
{@link SortBuffer}
+ * implementation, memory segment boundary serves as the nature data boundary 
of different
+ * subpartitions, which means that one memory segment can never contain data 
from different
+ * subpartitions.
+ */
+public class HashBasedPartitionSortedBuffer implements SortBuffer {
+
+    /** A buffer pool to request memory segments from. */
+    private final BufferPool bufferPool;
+
+    /** Number of guaranteed buffers can be allocated from the buffer pool for 
data sort. */
+    private final int numGuaranteedBuffers;
+
+    /** Buffers containing data for all subpartitions. */
+    private final ArrayDeque<BufferConsumer>[] buffers;
+
+    // 
---------------------------------------------------------------------------------------------
+    // Statistics and states
+    // 
---------------------------------------------------------------------------------------------
+
+    /** Total number of bytes already appended to this sort buffer. */
+    private long numTotalBytes;
+
+    /** Total number of records already appended to this sort buffer. */
+    private long numTotalRecords;
+
+    /** Whether this sort buffer is full and ready to read data from. */
+    private boolean isFull;
+
+    /** Whether this sort buffer is finished. One can only read a finished 
sort buffer. */
+    private boolean isFinished;
+
+    /** Whether this sort buffer is released. A released sort buffer can not 
be used. */
+    private boolean isReleased;
+
+    // 
---------------------------------------------------------------------------------------------
+    // For writing
+    // 
---------------------------------------------------------------------------------------------
+
+    /** Partial buffers to be appended data for each channel. */
+    private final BufferBuilder[] builders;
+
+    /** Total number of network buffers already occupied currently by this 
sort buffer. */
+    private int numBuffersOccupied;
+
+    // 
---------------------------------------------------------------------------------------------
+    // For reading
+    // 
---------------------------------------------------------------------------------------------
+
+    /** Used to index the current available channel to read data from. */
+    private int readOrderIndex;
+
+    /** Data of different subpartitions in this sort buffer will be read in 
this order. */
+    private final int[] subpartitionReadOrder;
+
+    /** Total number of bytes already read from this sort buffer. */
+    private long numTotalBytesRead;
+
+    public HashBasedPartitionSortedBuffer(
+            BufferPool bufferPool,
+            int numSubpartitions,
+            int numGuaranteedBuffers,
+            @Nullable int[] customReadOrder) {
+        checkArgument(numGuaranteedBuffers > 0, "No guaranteed buffers for 
sort.");
+
+        this.bufferPool = checkNotNull(bufferPool);
+        this.numGuaranteedBuffers = numGuaranteedBuffers;
+
+        this.builders = new BufferBuilder[numSubpartitions];
+        this.buffers = new ArrayDeque[numSubpartitions];
+        for (int channel = 0; channel < numSubpartitions; ++channel) {
+            this.buffers[channel] = new ArrayDeque<>();
+        }
+
+        this.subpartitionReadOrder = new int[numSubpartitions];
+        if (customReadOrder != null) {
+            checkArgument(customReadOrder.length == numSubpartitions, "Illegal 
data read order.");
+            System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 
0, numSubpartitions);
+        } else {
+            for (int channel = 0; channel < numSubpartitions; ++channel) {
+                this.subpartitionReadOrder[channel] = channel;
+            }
+        }
+    }
+
+    @Override
+    public boolean append(ByteBuffer source, int targetChannel, 
Buffer.DataType dataType)
+            throws IOException {
+        checkArgument(source.hasRemaining(), "Cannot append empty data.");
+        checkState(!isFull, "Sort buffer is already full.");
+        checkState(!isFinished, "Sort buffer is already finished.");
+        checkState(!isReleased, "Sort buffer is already released.");
+
+        int totalBytes = source.remaining();
+        if (dataType.isBuffer()) {
+            writeRecord(source, targetChannel);
+        } else {
+            writeEvent(source, targetChannel, dataType);
+        }
+
+        isFull = source.hasRemaining();
+        if (!isFull) {
+            ++numTotalRecords;
+        }
+        numTotalBytes += totalBytes - source.remaining();

Review comment:
       If source takes 5 buffers and 3 buffers are written, do we expect to 
write the remaining buffers in the next buffer? If so might add some comments 
in the method docs?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -288,58 +303,70 @@ private SortBuffer getBroadcastSortBuffer() throws 
IOException {
             return broadcastSortBuffer;
         }
 
-        broadcastSortBuffer =
-                new PartitionSortedBuffer(
-                        bufferPool,
-                        numSubpartitions,
-                        networkBufferSize,
-                        numBuffersForSort,
-                        subpartitionOrder);
+        broadcastSortBuffer = createNewSortBuffer();
         return broadcastSortBuffer;
     }
 
+    private SortBuffer createNewSortBuffer() {
+        if (numBuffersForWrite > 0) {
+            return new SortBasedPartitionSortedBuffer(
+                    bufferPool,
+                    numSubpartitions,
+                    networkBufferSize,
+                    numBuffersForSort,
+                    subpartitionOrder);
+        } else {
+            return new HashBasedPartitionSortedBuffer(
+                    bufferPool, numSubpartitions, numBuffersForSort, 
subpartitionOrder);
+        }
+    }
+
     private void flushSortBuffer(SortBuffer sortBuffer, boolean isBroadcast) 
throws IOException {
-        if (sortBuffer == null || sortBuffer.isReleased()) {
+        if (sortBuffer == null || sortBuffer.isReleased() || 
!sortBuffer.hasRemaining()) {
             return;
         }
-        sortBuffer.finish();
-
-        if (sortBuffer.hasRemaining()) {
-            fileWriter.startNewRegion(isBroadcast);
 
-            List<BufferWithChannel> toWrite = new ArrayList<>();
-            Queue<MemorySegment> segments = getWriteSegments();
+        Queue<MemorySegment> segments = new ArrayDeque<>(writeSegments);
+        int numBuffersToWrite =
+                numBuffersForWrite == 0
+                        ? EXPECTED_WRITE_BATCH_SIZE
+                        : Math.min(EXPECTED_WRITE_BATCH_SIZE, segments.size());
+        List<BufferWithChannel> toWrite = new ArrayList<>(numBuffersToWrite);
 
-            while (sortBuffer.hasRemaining()) {
-                if (segments.isEmpty()) {
-                    fileWriter.writeBuffers(toWrite);
-                    toWrite.clear();
-                    segments = getWriteSegments();
-                }
-
-                BufferWithChannel bufferWithChannel =
-                        
sortBuffer.copyIntoSegment(checkNotNull(segments.poll()));
-                updateStatistics(bufferWithChannel.getBuffer(), isBroadcast);
-                toWrite.add(compressBufferIfPossible(bufferWithChannel));
+        fileWriter.startNewRegion(isBroadcast);
+        do {
+            if (toWrite.size() >= numBuffersToWrite) {
+                writeBuffers(toWrite);
+                segments = new ArrayDeque<>(writeSegments);
             }
 
-            fileWriter.writeBuffers(toWrite);
-        }
+            BufferWithChannel bufferWithChannel = 
sortBuffer.copyIntoSegment(segments.poll());

Review comment:
       It is also a bit weird that `copyIntoSegment` might pass a null segment. 
In consideration of the deadline I think we might rename the method to be like 
`getNextBuffer(@Nullable MemorySegment transitBuffer)` and add proper comments 
? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -103,6 +111,12 @@
      */
     private int numBuffersForSort;
 
+    /**
+     * Number of reserved network buffers for data writing. This value can be 
0 and 0 means that
+     * {@link HashBasedPartitionSortedBuffer} will be used.
+     */
+    private int numBuffersForWrite;

Review comment:
       From the following modification, it seems the variable here mainly plays 
a role to indicate whether we want to use hash-based implementation and 
sort-based implementation. I think perhaps we could directly use a variable 
like `sortBufferType` or `useHashBuffer` to make it more clear. The 
`numBuffersForWrite` could be changed to be a local variable in the 
constructor. We could change the implementation to be like 
   
   ```
   if (numRequiredBuffer >= 2 * numSubpartitions) {
       useHashBuffer = true;
   } else {
       useHashBuffer = false;
   }
   
   if (!useHashBuffer) {
       int expectedWriteBuffers;
       if (numRequiredBuffer >= 2 * numSubpartitions) {
           expectedWriteBuffers = 0;
       } else if (networkBufferSize >= NUM_WRITE_BUFFER_BYTES) {
           expectedWriteBuffers = 1;
       } else {
           expectedWriteBuffers =
               Math.min(EXPECTED_WRITE_BATCH_SIZE, NUM_WRITE_BUFFER_BYTES / 
networkBufferSize);
       }
   
       int numBuffersForWrite = Math.min(numRequiredBuffer / 2, 
expectedWriteBuffers);
       numBuffersForSort = numRequiredBuffer - numBuffersForWrite;
   
       try {
           for (int i = 0; i < numBuffersForWrite; ++i) {
               MemorySegment segment = 
bufferPool.requestMemorySegmentBlocking();
               writeSegments.add(segment);
           }
       } catch (InterruptedException exception) {
           // the setup method does not allow InterruptedException
           throw new IOException(exception);
       }
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortBuffer.java
##########
@@ -65,4 +64,7 @@ boolean append(ByteBuffer source, int targetChannel, 
Buffer.DataType dataType)
 
     /** Whether this {@link SortBuffer} is released or not. */
     boolean isReleased();
+
+    /** Resets this {@link SortBuffer} to be reused for data appending. */
+    void reset();

Review comment:
       Might move this method before `finish()`.
   
   Perhaps we could also add some description of the lifecycle of the 
`SortBuffer` in the class document? Like describe the process of `write, 
writer, full, read, read, reset, ..., finish, release`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to