This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fa1d32a9 [CELEBORN-1374] Refactor SortBuffer and PartitionSortedBuffer
7fa1d32a9 is described below

commit 7fa1d32a98ed98c147490d470fc687a971b21715
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 9 15:47:57 2024 +0800

    [CELEBORN-1374] Refactor SortBuffer and PartitionSortedBuffer
    
    ### What changes were proposed in this pull request?
    
    Refactor `SortBuffer` and `PartitionSortedBuffer` with introduction of 
`DataBuffer` and `SortBasedDataBuffer`.
    
    ### Why are the changes needed?
    
    `SortBuffer` and `PartitionSortedBuffer` is refactored in 
https://github.com/apache/flink/pull/18505. Celeborn Flink should also refactor 
`SortBuffer` and `PartitionSortedBuffer` to sync the interface changes in 
Flink. Meanwhile, `SortBuffer` and `PartitionSortedBuffer` should distinguish 
channel and subpartition for https://github.com/apache/flink/pull/23927.
    
    ### Does this PR introduce _any_ user-facing change?
    
    - `SortBuffer` renames to `DataBuffer`.
    - `PartitionSortedBuffer` renames to `SortBasedDataBuffer`.
    - `SortBuffer.BufferWithChannel` renames to `BufferWithSubpartition`
    
    ### How was this patch tested?
    
    UT and IT.
    
    Closes #2448 from SteNicholas/CELEBORN-1374.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 LICENSE                                            |   4 +-
 .../RemoteShuffleResultPartitionDelegation.java    | 136 ++++++++---------
 .../flink/buffer/BufferWithSubpartition.java       |  43 ++++++
 .../celeborn/plugin/flink/buffer/DataBuffer.java   |  74 +++++++++
 ...nSortedBuffer.java => SortBasedDataBuffer.java} | 100 +++++++------
 .../celeborn/plugin/flink/buffer/SortBuffer.java   |  92 ------------
 ...rSuiteJ.java => SortBasedDataBufferSuiteJ.java} | 165 +++++++++++----------
 .../plugin/flink/RemoteShuffleResultPartition.java |  27 ++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |   8 +-
 .../plugin/flink/RemoteShuffleResultPartition.java |  26 ++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |   8 +-
 .../plugin/flink/RemoteShuffleResultPartition.java |  28 ++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |   8 +-
 .../plugin/flink/RemoteShuffleResultPartition.java |  28 ++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |   8 +-
 .../plugin/flink/RemoteShuffleResultPartition.java |  28 ++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |   8 +-
 17 files changed, 396 insertions(+), 395 deletions(-)

diff --git a/LICENSE b/LICENSE
index 30b865ffc..3c8858ec9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -253,6 +253,6 @@ Remote Shuffle Service for Flink
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferRecycler.java
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/CreditListener.java
-./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
-./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
+./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
+./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
 
./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 703fcf10f..1bd4285ee 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -34,8 +34,9 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.celeborn.plugin.flink.buffer.PartitionSortedBuffer;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.buffer.SortBasedDataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
@@ -46,11 +47,11 @@ public class RemoteShuffleResultPartitionDelegation {
   /** Size of network buffer and write buffer. */
   public int networkBufferSize;
 
-  /** {@link SortBuffer} for records sent by broadcastRecord. */
-  public SortBuffer broadcastSortBuffer;
+  /** {@link DataBuffer} for records sent by broadcastRecord. */
+  public DataBuffer broadcastDataBuffer;
 
-  /** {@link SortBuffer} for records sent by emitRecord. */
-  public SortBuffer unicastSortBuffer;
+  /** {@link DataBuffer} for records sent by emitRecord. */
+  public DataBuffer unicastDataBuffer;
 
   /** Utility to spill data to shuffle workers. */
   public RemoteShuffleOutputGate outputGate;
@@ -58,17 +59,17 @@ public class RemoteShuffleResultPartitionDelegation {
   /** Whether notifyEndOfData has been called or not. */
   private boolean endOfDataNotified;
 
-  private int numSubpartitions;
+  private final int numSubpartitions;
   private BufferPool bufferPool;
   private BufferCompressor bufferCompressor;
   private Function<Buffer, Boolean> canBeCompressed;
   private Runnable checkProducerState;
-  private BiConsumer<SortBuffer.BufferWithChannel, Boolean> statisticsConsumer;
+  private final BiConsumer<BufferWithSubpartition, Boolean> statisticsConsumer;
 
   public RemoteShuffleResultPartitionDelegation(
       int networkBufferSize,
       RemoteShuffleOutputGate outputGate,
-      BiConsumer<SortBuffer.BufferWithChannel, Boolean> statisticsConsumer,
+      BiConsumer<BufferWithSubpartition, Boolean> statisticsConsumer,
       int numSubpartitions) {
     this.networkBufferSize = networkBufferSize;
     this.outputGate = outputGate;
@@ -105,92 +106,92 @@ public class RemoteShuffleResultPartitionDelegation {
           targetSubpartition == 0, "Target subpartition index can only be 0 
when broadcast.");
     }
 
-    SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : 
getUnicastSortBuffer();
-    if (sortBuffer.append(record, targetSubpartition, dataType)) {
+    DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() : 
getUnicastDataBuffer();
+    if (dataBuffer.append(record, targetSubpartition, dataType)) {
       return;
     }
 
     try {
-      if (!sortBuffer.hasRemaining()) {
-        // the record can not be appended to the free sort buffer because it 
is too large
-        sortBuffer.finish();
-        sortBuffer.release();
+      if (!dataBuffer.hasRemaining()) {
+        // the record can not be appended to the free data buffer because it 
is too large
+        dataBuffer.finish();
+        dataBuffer.release();
         writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
         return;
       }
-      flushSortBuffer(sortBuffer, isBroadcast);
+      flushDataBuffer(dataBuffer, isBroadcast);
     } catch (InterruptedException e) {
-      LOG.error("Failed to flush the sort buffer.", e);
+      LOG.error("Failed to flush the data buffer.", e);
       Utils.rethrowAsRuntimeException(e);
     }
     emit(record, targetSubpartition, dataType, isBroadcast);
   }
 
   @VisibleForTesting
-  public SortBuffer getUnicastSortBuffer() throws IOException {
-    flushBroadcastSortBuffer();
+  public DataBuffer getUnicastDataBuffer() throws IOException {
+    flushBroadcastDataBuffer();
 
-    if (unicastSortBuffer != null && !unicastSortBuffer.isFinished()) {
-      return unicastSortBuffer;
+    if (unicastDataBuffer != null && !unicastDataBuffer.isFinished()) {
+      return unicastDataBuffer;
     }
 
-    unicastSortBuffer =
-        new PartitionSortedBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
-    return unicastSortBuffer;
+    unicastDataBuffer =
+        new SortBasedDataBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
+    return unicastDataBuffer;
   }
 
-  public SortBuffer getBroadcastSortBuffer() throws IOException {
-    flushUnicastSortBuffer();
+  public DataBuffer getBroadcastDataBuffer() throws IOException {
+    flushUnicastDataBuffer();
 
-    if (broadcastSortBuffer != null && !broadcastSortBuffer.isFinished()) {
-      return broadcastSortBuffer;
+    if (broadcastDataBuffer != null && !broadcastDataBuffer.isFinished()) {
+      return broadcastDataBuffer;
     }
 
-    broadcastSortBuffer =
-        new PartitionSortedBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
-    return broadcastSortBuffer;
+    broadcastDataBuffer =
+        new SortBasedDataBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
+    return broadcastDataBuffer;
   }
 
-  public void flushBroadcastSortBuffer() throws IOException {
-    flushSortBuffer(broadcastSortBuffer, true);
+  public void flushBroadcastDataBuffer() throws IOException {
+    flushDataBuffer(broadcastDataBuffer, true);
   }
 
-  public void flushUnicastSortBuffer() throws IOException {
-    flushSortBuffer(unicastSortBuffer, false);
+  public void flushUnicastDataBuffer() throws IOException {
+    flushDataBuffer(unicastDataBuffer, false);
   }
 
   @VisibleForTesting
-  void flushSortBuffer(SortBuffer sortBuffer, boolean isBroadcast) throws 
IOException {
-    if (sortBuffer == null || sortBuffer.isReleased()) {
+  void flushDataBuffer(DataBuffer dataBuffer, boolean isBroadcast) throws 
IOException {
+    if (dataBuffer == null || dataBuffer.isReleased()) {
       return;
     }
-    sortBuffer.finish();
-    if (sortBuffer.hasRemaining()) {
+    dataBuffer.finish();
+    if (dataBuffer.hasRemaining()) {
       try {
         outputGate.regionStart(isBroadcast);
-        while (sortBuffer.hasRemaining()) {
+        while (dataBuffer.hasRemaining()) {
           MemorySegment segment = 
outputGate.getBufferPool().requestMemorySegmentBlocking();
-          SortBuffer.BufferWithChannel bufferWithChannel;
+          BufferWithSubpartition bufferWithSubpartition;
           try {
-            bufferWithChannel =
-                sortBuffer.copyIntoSegment(
+            bufferWithSubpartition =
+                dataBuffer.getNextBuffer(
                     segment, outputGate.getBufferPool(), 
BufferUtils.HEADER_LENGTH);
           } catch (Throwable t) {
             outputGate.getBufferPool().recycle(segment);
             throw new FlinkRuntimeException("Shuffle write failure.", t);
           }
 
-          Buffer buffer = bufferWithChannel.getBuffer();
-          int subpartitionIndex = bufferWithChannel.getChannelIndex();
-          statisticsConsumer.accept(bufferWithChannel, isBroadcast);
+          Buffer buffer = bufferWithSubpartition.getBuffer();
+          int subpartitionIndex = 
bufferWithSubpartition.getSubpartitionIndex();
+          statisticsConsumer.accept(bufferWithSubpartition, isBroadcast);
           writeCompressedBufferIfPossible(buffer, subpartitionIndex);
         }
         outputGate.regionFinish();
       } catch (InterruptedException e) {
-        throw new IOException("Failed to flush the sort buffer, broadcast=" + 
isBroadcast, e);
+        throw new IOException("Failed to flush the data buffer, broadcast=" + 
isBroadcast, e);
       }
     }
-    releaseSortBuffer(sortBuffer);
+    releaseDataBuffer(dataBuffer);
   }
 
   public void writeCompressedBufferIfPossible(Buffer buffer, int 
targetSubpartition)
@@ -234,9 +235,9 @@ public class RemoteShuffleResultPartitionDelegation {
               dataType,
               toCopy + BufferUtils.HEADER_LENGTH);
 
-      SortBuffer.BufferWithChannel bufferWithChannel =
-          new SortBuffer.BufferWithChannel(buffer, targetSubpartition);
-      statisticsConsumer.accept(bufferWithChannel, isBroadcast);
+      BufferWithSubpartition bufferWithSubpartition =
+          new BufferWithSubpartition(buffer, targetSubpartition);
+      statisticsConsumer.accept(bufferWithSubpartition, isBroadcast);
       writeCompressedBufferIfPossible(buffer, targetSubpartition);
     }
     outputGate.regionFinish();
@@ -246,17 +247,17 @@ public class RemoteShuffleResultPartitionDelegation {
     emit(record, 0, dataType, true);
   }
 
-  public void releaseSortBuffer(SortBuffer sortBuffer) {
-    if (sortBuffer != null) {
-      sortBuffer.release();
+  public void releaseDataBuffer(DataBuffer dataBuffer) {
+    if (dataBuffer != null) {
+      dataBuffer.release();
     }
   }
 
   public void finish() throws IOException {
     Utils.checkState(
-        unicastSortBuffer == null || unicastSortBuffer.isReleased(),
-        "The unicast sort buffer should be either null or released.");
-    flushBroadcastSortBuffer();
+        unicastDataBuffer == null || unicastDataBuffer.isReleased(),
+        "The unicast data buffer should be either null or released.");
+    flushBroadcastDataBuffer();
     try {
       outputGate.finish();
     } catch (InterruptedException e) {
@@ -265,22 +266,21 @@ public class RemoteShuffleResultPartitionDelegation {
   }
 
   public synchronized void close(Runnable closeHandler) {
-    Throwable closeException = null;
+    Throwable closeException;
     closeException =
         checkException(
-            () -> releaseSortBuffer(unicastSortBuffer),
-            closeException,
-            "Failed to release unicast sort buffer.");
+            () -> releaseDataBuffer(unicastDataBuffer),
+            null,
+            "Failed to release unicast data buffer.");
 
     closeException =
         checkException(
-            () -> releaseSortBuffer(broadcastSortBuffer),
+            () -> releaseDataBuffer(broadcastDataBuffer),
             closeException,
-            "Failed to release broadcast sort buffer.");
+            "Failed to release broadcast data buffer.");
 
     closeException =
-        checkException(
-            () -> closeHandler.run(), closeException, "Failed to call 
super#close() method.");
+        checkException(closeHandler, closeException, "Failed to call 
super#close() method.");
 
     try {
       outputGate.close();
@@ -307,10 +307,10 @@ public class RemoteShuffleResultPartitionDelegation {
 
   public void flushAll() {
     try {
-      flushUnicastSortBuffer();
-      flushBroadcastSortBuffer();
+      flushUnicastDataBuffer();
+      flushBroadcastDataBuffer();
     } catch (Throwable t) {
-      LOG.error("Failed to flush the current sort buffer.", t);
+      LOG.error("Failed to flush the current data buffer.", t);
       Utils.rethrowAsRuntimeException(t);
     }
   }
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
new file mode 100644
index 000000000..2d2f767a9
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferWithSubpartition.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** Buffer and the corresponding subpartition index. */
+public class BufferWithSubpartition {
+
+  private final Buffer buffer;
+
+  private final int subpartitionIndex;
+
+  public BufferWithSubpartition(Buffer buffer, int subpartitionIndex) {
+    this.buffer = checkNotNull(buffer);
+    this.subpartitionIndex = subpartitionIndex;
+  }
+
+  public Buffer getBuffer() {
+    return buffer;
+  }
+
+  public int getSubpartitionIndex() {
+    return subpartitionIndex;
+  }
+}
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
new file mode 100644
index 000000000..562db900f
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.annotation.Nullable;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * Data of different subpartitions can be appended to a {@link DataBuffer} and 
after the {@link
+ * DataBuffer} is full or finished, the appended data can be copied from it in 
subpartition index
+ * order.
+ *
+ * <p>The lifecycle of a {@link DataBuffer} can be: new, write, [read, reset, 
write], finish, read,
+ * release. There can be multiple [read, reset, write] operations before 
finish.
+ */
+public interface DataBuffer {
+
+  /**
+   * Appends data of the specified subpartition to this {@link DataBuffer} and 
returns true if this
+   * {@link DataBuffer} is full.
+   */
+  boolean append(ByteBuffer source, int targetSubpartition, Buffer.DataType 
dataType)
+      throws IOException;
+
+  /**
+   * Copies data in this {@link DataBuffer} to the target {@link 
MemorySegment} in subpartition
+   * index order and returns {@link BufferWithSubpartition} which contains the 
copied data and the
+   * corresponding subpartition index.
+   */
+  BufferWithSubpartition getNextBuffer(
+      @Nullable MemorySegment transitBuffer, BufferRecycler recycler, int 
offset);
+
+  /** Returns the total number of records written to this {@link DataBuffer}. 
*/
+  long numTotalRecords();
+
+  /** Returns the total number of bytes written to this {@link DataBuffer}. */
+  long numTotalBytes();
+
+  /** Returns true if not all data appended to this {@link DataBuffer} is 
consumed. */
+  boolean hasRemaining();
+
+  /** Finishes this {@link DataBuffer} which means no record can be appended 
anymore. */
+  void finish();
+
+  /** Whether this {@link DataBuffer} is finished or not. */
+  boolean isFinished();
+
+  /** Releases this {@link DataBuffer} which releases all resources. */
+  void release();
+
+  /** Whether this {@link DataBuffer} is released or not. */
+  boolean isReleased();
+}
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
similarity index 79%
rename from 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
index c11e5d373..fafc2c8cd 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java
@@ -38,19 +38,20 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.util.FlinkRuntimeException;
 
 /**
- * A {@link SortBuffer} implementation which sorts all appended records only 
by subpartition index.
+ * A {@link DataBuffer} implementation which sorts all appended records only 
by subpartition index.
  * Records of the same subpartition keep the appended order.
  *
  * <p>It maintains a list of {@link MemorySegment}s as a joint buffer. Data 
will be appended to the
  * joint buffer sequentially. When writing a record, an index entry will be 
appended first. An index
- * entry consists of 4 fields: 4 bytes for record length, 4 bytes for {@link 
DataType} and 8 bytes
- * for address pointing to the next index entry of the same channel which will 
be used to index the
- * next record to read when coping data from this {@link SortBuffer}. For 
simplicity, no index entry
- * can span multiple segments. The corresponding record data is seated right 
after its index entry
- * and different from the index entry, records have variable length thus may 
span multiple segments.
+ * entry consists of 4 fields: 4 bytes for record length, 4 bytes for {@link 
Buffer.DataType} and 8
+ * bytes for address pointing to the next index entry of the same subpartition 
which will be used to
+ * index the next record to read when coping data from this {@link 
DataBuffer}. For simplicity, no
+ * index entry can span multiple segments. The corresponding record data is 
seated right after its
+ * index entry and different from the index entry, records have variable 
length thus may span
+ * multiple segments.
  */
 @NotThreadSafe
-public class PartitionSortedBuffer implements SortBuffer {
+public class SortBasedDataBuffer implements DataBuffer {
 
   /**
    * Size of an index entry: 4 bytes for record length, 4 bytes for data type 
and 8 bytes for
@@ -71,25 +72,25 @@ public class PartitionSortedBuffer implements SortBuffer {
   private final long[] lastIndexEntryAddresses;
   /** Size of buffers requested from buffer pool. All buffers must be of the 
same size. */
   private final int bufferSize;
-  /** Data of different subpartitions in this sort buffer will be read in this 
order. */
+  /** Data of different subpartitions in this data buffer will be read in this 
order. */
   private final int[] subpartitionReadOrder;
 
   // 
---------------------------------------------------------------------------------------------
   // Statistics and states
   // 
---------------------------------------------------------------------------------------------
-  /** Total number of bytes already appended to this sort buffer. */
+  /** Total number of bytes already appended to this data buffer. */
   private long numTotalBytes;
-  /** Total number of records already appended to this sort buffer. */
+  /** Total number of records already appended to this data buffer. */
   private long numTotalRecords;
-  /** Total number of bytes already read from this sort buffer. */
+  /** Total number of bytes already read from this data buffer. */
   private long numTotalBytesRead;
-  /** Whether this sort buffer is finished. One can only read a finished sort 
buffer. */
+  /** Whether this data buffer is finished. One can only read a finished data 
buffer. */
   private boolean isFinished;
 
   // 
---------------------------------------------------------------------------------------------
   // For writing
   // 
---------------------------------------------------------------------------------------------
-  /** Whether this sort buffer is released. A released sort buffer can not be 
used. */
+  /** Whether this data buffer is released. A released data buffer can not be 
used. */
   private boolean isReleased;
   /** Array index in the segment list of the current available buffer for 
writing. */
   private int writeSegmentIndex;
@@ -105,10 +106,10 @@ public class PartitionSortedBuffer implements SortBuffer {
   /** Record bytes remaining after last copy, which must be read first in next 
copy. */
   private int recordRemainingBytes;
 
-  /** Used to index the current available channel to read data from. */
+  /** Used to index the current available subpartition to read data from. */
   private int readOrderIndex = -1;
 
-  public PartitionSortedBuffer(
+  public SortBasedDataBuffer(
       BufferPool bufferPool,
       int numSubpartitions,
       int bufferSize,
@@ -120,7 +121,7 @@ public class PartitionSortedBuffer implements SortBuffer {
     this.firstIndexEntryAddresses = new long[numSubpartitions];
     this.lastIndexEntryAddresses = new long[numSubpartitions];
 
-    // initialized with -1 means the corresponding channel has no data.
+    // initialized with -1 means the corresponding subpartition has no data.
     Arrays.fill(firstIndexEntryAddresses, -1L);
     Arrays.fill(lastIndexEntryAddresses, -1L);
 
@@ -129,18 +130,18 @@ public class PartitionSortedBuffer implements SortBuffer {
       checkArgument(customReadOrder.length == numSubpartitions, "Illegal data 
read order.");
       System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, 
numSubpartitions);
     } else {
-      for (int channel = 0; channel < numSubpartitions; ++channel) {
-        this.subpartitionReadOrder[channel] = channel;
+      for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+        this.subpartitionReadOrder[subpartition] = subpartition;
       }
     }
   }
 
   @Override
-  public boolean append(ByteBuffer source, int targetChannel, DataType 
dataType)
+  public boolean append(ByteBuffer source, int targetSubpartition, DataType 
dataType)
       throws IOException {
     checkArgument(source.hasRemaining(), "Cannot append empty data.");
-    checkState(!isFinished, "Sort buffer is already finished.");
-    checkState(!isReleased, "Sort buffer is already released.");
+    checkState(!isFinished, "Data buffer is already finished.");
+    checkState(!isReleased, "Data buffer is already released.");
 
     int totalBytes = source.remaining();
 
@@ -150,7 +151,7 @@ public class PartitionSortedBuffer implements SortBuffer {
     }
 
     // write the index entry and record or event data
-    writeIndex(targetChannel, totalBytes, dataType);
+    writeIndex(targetSubpartition, totalBytes, dataType);
     writeRecord(source);
 
     ++numTotalRecords;
@@ -159,7 +160,7 @@ public class PartitionSortedBuffer implements SortBuffer {
     return true;
   }
 
-  private void writeIndex(int channelIndex, int numRecordBytes, DataType 
dataType) {
+  private void writeIndex(int subpartitionIndex, int numRecordBytes, DataType 
dataType) {
     MemorySegment segment = buffers.get(writeSegmentIndex);
 
     // record length takes the high 32 bits and data type takes the low 32 bits
@@ -168,15 +169,15 @@ public class PartitionSortedBuffer implements SortBuffer {
     // segment index takes the high 32 bits and segment offset takes the low 
32 bits
     long indexEntryAddress = ((long) writeSegmentIndex << 32) | 
writeSegmentOffset;
 
-    long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
-    lastIndexEntryAddresses[channelIndex] = indexEntryAddress;
+    long lastIndexEntryAddress = lastIndexEntryAddresses[subpartitionIndex];
+    lastIndexEntryAddresses[subpartitionIndex] = indexEntryAddress;
 
     if (lastIndexEntryAddress >= 0) {
-      // link the previous index entry of the given channel to the new index 
entry
+      // link the previous index entry of the given subpartition to the new 
index entry
       segment = buffers.get(getSegmentIndexFromPointer(lastIndexEntryAddress));
       segment.putLong(getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, 
indexEntryAddress);
     } else {
-      firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
+      firstIndexEntryAddresses[subpartitionIndex] = indexEntryAddress;
     }
 
     // move the writer position forward to write the corresponding record
@@ -232,7 +233,7 @@ public class PartitionSortedBuffer implements SortBuffer {
 
     if (isReleased) {
       bufferPool.recycle(segment);
-      throw new IllegalStateException("Sort buffer is already released.");
+      throw new IllegalStateException("Data buffer is already released.");
     }
 
     buffers.add(segment);
@@ -262,15 +263,15 @@ public class PartitionSortedBuffer implements SortBuffer {
   }
 
   @Override
-  public BufferWithChannel copyIntoSegment(
-      MemorySegment target, BufferRecycler recycler, int offset) {
+  public BufferWithSubpartition getNextBuffer(
+      MemorySegment transitBuffer, BufferRecycler recycler, int offset) {
     checkState(hasRemaining(), "No data remaining.");
-    checkState(isFinished, "Should finish the sort buffer first before coping 
any data.");
-    checkState(!isReleased, "Sort buffer is already released.");
+    checkState(isFinished, "Should finish the data buffer first before coping 
any data.");
+    checkState(!isReleased, "Data buffer is already released.");
 
     int numBytesCopied = 0;
     DataType bufferDataType = DataType.DATA_BUFFER;
-    int channelIndex = subpartitionReadOrder[readOrderIndex];
+    int subpartitionIndex = subpartitionReadOrder[readOrderIndex];
 
     do {
       int sourceSegmentIndex = 
getSegmentIndexFromPointer(readIndexEntryAddress);
@@ -292,27 +293,32 @@ public class PartitionSortedBuffer implements SortBuffer {
       sourceSegmentOffset += INDEX_ENTRY_SIZE;
 
       // throws if the event is too big to be accommodated by a buffer.
-      if (bufferDataType.isEvent() && target.size() - offset < length) {
+      if (bufferDataType.isEvent() && transitBuffer.size() - offset < length) {
         throw new FlinkRuntimeException("Event is too big to be accommodated 
by a buffer");
       }
 
       numBytesCopied +=
           copyRecordOrEvent(
-              target, numBytesCopied + offset, sourceSegmentIndex, 
sourceSegmentOffset, length);
+              transitBuffer,
+              numBytesCopied + offset,
+              sourceSegmentIndex,
+              sourceSegmentOffset,
+              length);
 
       if (recordRemainingBytes == 0) {
-        // move to next channel if the current channel has been finished
-        if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
+        // move to next subpartition if the current subpartition has been 
finished
+        if (readIndexEntryAddress == 
lastIndexEntryAddresses[subpartitionIndex]) {
           updateReadChannelAndIndexEntryAddress();
           break;
         }
         readIndexEntryAddress = nextReadIndexEntryAddress;
       }
-    } while (numBytesCopied < target.size() - offset && 
bufferDataType.isBuffer());
+    } while (numBytesCopied < transitBuffer.size() - offset && 
bufferDataType.isBuffer());
 
     numTotalBytesRead += numBytesCopied;
-    Buffer buffer = new NetworkBuffer(target, recycler, bufferDataType, 
numBytesCopied + offset);
-    return new BufferWithChannel(buffer, channelIndex);
+    Buffer buffer =
+        new NetworkBuffer(transitBuffer, recycler, bufferDataType, 
numBytesCopied + offset);
+    return new BufferWithSubpartition(buffer, subpartitionIndex);
   }
 
   private int copyRecordOrEvent(
@@ -354,10 +360,10 @@ public class PartitionSortedBuffer implements SortBuffer {
   }
 
   private void updateReadChannelAndIndexEntryAddress() {
-    // skip the channels without any data
+    // skip the subpartitions without any data
     while (++readOrderIndex < firstIndexEntryAddresses.length) {
-      int channelIndex = subpartitionReadOrder[readOrderIndex];
-      if ((readIndexEntryAddress = firstIndexEntryAddresses[channelIndex]) >= 
0) {
+      int subpartitionIndex = subpartitionReadOrder[readOrderIndex];
+      if ((readIndexEntryAddress = 
firstIndexEntryAddresses[subpartitionIndex]) >= 0) {
         break;
       }
     }
@@ -372,12 +378,12 @@ public class PartitionSortedBuffer implements SortBuffer {
   }
 
   @Override
-  public long numRecords() {
+  public long numTotalRecords() {
     return numTotalRecords;
   }
 
   @Override
-  public long numBytes() {
+  public long numTotalBytes() {
     return numTotalBytes;
   }
 
@@ -388,7 +394,7 @@ public class PartitionSortedBuffer implements SortBuffer {
 
   @Override
   public void finish() {
-    checkState(!isFinished, SortBuffer.class.getCanonicalName() + " is already 
finished.");
+    checkState(!isFinished, DataBuffer.class.getCanonicalName() + " is already 
finished.");
 
     isFinished = true;
 
@@ -403,7 +409,7 @@ public class PartitionSortedBuffer implements SortBuffer {
 
   @Override
   public void release() {
-    // the sort buffer can be released by other threads
+    // the data buffer can be released by other threads
     if (isReleased) {
       return;
     }
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
deleted file mode 100644
index 7318d6bb1..000000000
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.plugin.flink.buffer;
-
-import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * Data of different channels can be appended to a {@link SortBuffer}., after 
appending finished,
- * data can be copied from it in channel index order.
- */
-public interface SortBuffer {
-
-  /**
-   * Appends data of the specified channel to this {@link SortBuffer} and 
returns true if all bytes
-   * of the source buffer is copied to this {@link SortBuffer} successfully, 
otherwise if returns
-   * false, nothing will be copied.
-   */
-  boolean append(ByteBuffer source, int targetChannel, Buffer.DataType 
dataType) throws IOException;
-
-  /**
-   * Copies data from this {@link SortBuffer} to the target {@link 
MemorySegment} in channel index
-   * order and returns {@link BufferWithChannel} which contains the copied 
data and the
-   * corresponding channel index.
-   */
-  BufferWithChannel copyIntoSegment(MemorySegment target, BufferRecycler 
recycler, int offset);
-
-  /** Returns the number of records written to this {@link SortBuffer}. */
-  long numRecords();
-
-  /** Returns the number of bytes written to this {@link SortBuffer}. */
-  long numBytes();
-
-  /** Returns true if there is still data can be consumed in this {@link 
SortBuffer}. */
-  boolean hasRemaining();
-
-  /** Finishes this {@link SortBuffer} which means no record can be appended 
anymore. */
-  void finish();
-
-  /** Whether this {@link SortBuffer} is finished or not. */
-  boolean isFinished();
-
-  /** Releases this {@link SortBuffer} which releases all resources. */
-  void release();
-
-  /** Whether this {@link SortBuffer} is released or not. */
-  boolean isReleased();
-
-  /** Buffer and the corresponding channel index returned to reader. */
-  class BufferWithChannel {
-
-    private final Buffer buffer;
-
-    private final int channelIndex;
-
-    public BufferWithChannel(Buffer buffer, int channelIndex) {
-      this.buffer = checkNotNull(buffer);
-      this.channelIndex = channelIndex;
-    }
-
-    /** Get {@link Buffer}. */
-    public Buffer getBuffer() {
-      return buffer;
-    }
-
-    /** Get channel index. */
-    public int getChannelIndex() {
-      return channelIndex;
-    }
-  }
-}
diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
similarity index 66%
rename from 
client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
rename to 
client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
index b1f5ced91..e68ebda99 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PartitionSortedBufferSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/SortBasedDataBufferSuiteJ.java
@@ -38,18 +38,19 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.junit.Test;
 
-import org.apache.celeborn.plugin.flink.buffer.PartitionSortedBuffer;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.buffer.SortBasedDataBuffer;
 
-public class PartitionSortedBufferSuiteJ {
+public class SortBasedDataBufferSuiteJ {
   @Test
-  public void testWriteAndReadSortBuffer() throws Exception {
+  public void testWriteAndReadDataBuffer() throws Exception {
     int numSubpartitions = 10;
     int bufferSize = 1024;
     int bufferPoolSize = 1000;
     Random random = new Random(1111);
 
-    // used to store data written to and read from sort buffer for correctness 
check
+    // used to store data written to and read from data buffer for correctness 
check
     Queue<DataAndType>[] dataWritten = new Queue[numSubpartitions];
     Queue<Buffer>[] buffersRead = new Queue[numSubpartitions];
     for (int i = 0; i < numSubpartitions; ++i) {
@@ -62,10 +63,10 @@ public class PartitionSortedBufferSuiteJ {
     Arrays.fill(numBytesWritten, 0);
     Arrays.fill(numBytesRead, 0);
 
-    // fill the sort buffer with randomly generated data
+    // fill the data buffer with randomly generated data
     int totalBytesWritten = 0;
-    SortBuffer sortBuffer =
-        createSortBuffer(
+    DataBuffer dataBuffer =
+        createDataBuffer(
             bufferPoolSize,
             bufferSize,
             numSubpartitions,
@@ -86,8 +87,8 @@ public class PartitionSortedBufferSuiteJ {
       boolean isBuffer = random.nextBoolean() || recordSize > bufferSize;
       Buffer.DataType dataType =
           isBuffer ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.EVENT_BUFFER;
-      if (!sortBuffer.append(record, subpartition, dataType)) {
-        sortBuffer.finish();
+      if (!dataBuffer.append(record, subpartition, dataType)) {
+        dataBuffer.finish();
         break;
       }
       record.rewind();
@@ -96,17 +97,17 @@ public class PartitionSortedBufferSuiteJ {
       totalBytesWritten += recordSize;
     }
 
-    // read all data from the sort buffer
-    while (sortBuffer.hasRemaining()) {
+    // read all data from the data buffer
+    while (dataBuffer.hasRemaining()) {
       MemorySegment readBuffer = 
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
-      SortBuffer.BufferWithChannel bufferAndChannel =
-          sortBuffer.copyIntoSegment(readBuffer, ignore -> {}, 0);
-      int subpartition = bufferAndChannel.getChannelIndex();
-      buffersRead[subpartition].add(bufferAndChannel.getBuffer());
-      numBytesRead[subpartition] += 
bufferAndChannel.getBuffer().readableBytes();
+      BufferWithSubpartition bufferWithSubpartition =
+          dataBuffer.getNextBuffer(readBuffer, ignore -> {}, 0);
+      int subpartition = bufferWithSubpartition.getSubpartitionIndex();
+      buffersRead[subpartition].add(bufferWithSubpartition.getBuffer());
+      numBytesRead[subpartition] += 
bufferWithSubpartition.getBuffer().readableBytes();
     }
 
-    assertEquals(totalBytesWritten, sortBuffer.numBytes());
+    assertEquals(totalBytesWritten, dataBuffer.numTotalBytes());
     checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, 
dataWritten, buffersRead);
   }
 
@@ -161,68 +162,68 @@ public class PartitionSortedBufferSuiteJ {
       ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null, 
ByteBuffer.allocate(1024)
     };
 
-    SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize, 
numSubpartitions);
+    DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize, 
numSubpartitions);
     for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
       ByteBuffer record = subpartitionRecords[subpartition];
       if (record != null) {
-        sortBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
+        dataBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
         record.rewind();
       }
     }
-    sortBuffer.finish();
+    dataBuffer.finish();
 
-    checkReadResult(sortBuffer, subpartitionRecords[0], 0, bufferSize);
+    checkReadResult(dataBuffer, subpartitionRecords[0], 0, bufferSize);
 
     ByteBuffer expected1 = subpartitionRecords[2].duplicate();
     expected1.limit(bufferSize);
-    checkReadResult(sortBuffer, expected1.slice(), 2, bufferSize);
+    checkReadResult(dataBuffer, expected1.slice(), 2, bufferSize);
 
     ByteBuffer expected2 = subpartitionRecords[2].duplicate();
     expected2.position(bufferSize);
-    checkReadResult(sortBuffer, expected2.slice(), 2, bufferSize);
+    checkReadResult(dataBuffer, expected2.slice(), 2, bufferSize);
 
-    checkReadResult(sortBuffer, subpartitionRecords[4], 4, bufferSize);
+    checkReadResult(dataBuffer, subpartitionRecords[4], 4, bufferSize);
   }
 
   private void checkReadResult(
-      SortBuffer sortBuffer, ByteBuffer expectedBuffer, int expectedChannel, 
int bufferSize) {
+      DataBuffer dataBuffer, ByteBuffer expectedBuffer, int 
expectedSubpartition, int bufferSize) {
     MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
-    SortBuffer.BufferWithChannel bufferWithChannel =
-        sortBuffer.copyIntoSegment(segment, ignore -> {}, 0);
-    assertEquals(expectedChannel, bufferWithChannel.getChannelIndex());
-    assertEquals(expectedBuffer, 
bufferWithChannel.getBuffer().getNioBufferReadable());
+    BufferWithSubpartition bufferWithSubpartition =
+        dataBuffer.getNextBuffer(segment, ignore -> {}, 0);
+    assertEquals(expectedSubpartition, 
bufferWithSubpartition.getSubpartitionIndex());
+    assertEquals(expectedBuffer, 
bufferWithSubpartition.getBuffer().getNioBufferReadable());
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testWriteEmptyData() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
 
     ByteBuffer record = ByteBuffer.allocate(1);
     record.position(1);
 
-    sortBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
+    dataBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testWriteFinishedSortBuffer() throws Exception {
+  public void testWriteFinishedDataBuffer() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
-    sortBuffer.finish();
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+    dataBuffer.finish();
 
-    sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+    dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testWriteReleasedSortBuffer() throws Exception {
+  public void testWriteReleasedDataBuffer() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
-    sortBuffer.release();
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+    dataBuffer.release();
 
-    sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+    dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
   }
 
   @Test
@@ -230,15 +231,15 @@ public class PartitionSortedBufferSuiteJ {
     int bufferPoolSize = 10;
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize, 1);
+    DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize, 1);
 
     for (int i = 1; i < bufferPoolSize; ++i) {
-      appendAndCheckResult(sortBuffer, bufferSize, true, bufferSize * i, i, 
true);
+      appendAndCheckResult(dataBuffer, bufferSize, true, bufferSize * i, i, 
true);
     }
 
     // append should fail for insufficient capacity
     int numRecords = bufferPoolSize - 1;
-    appendAndCheckResult(sortBuffer, bufferSize, false, bufferSize * 
numRecords, numRecords, true);
+    appendAndCheckResult(dataBuffer, bufferSize, false, bufferSize * 
numRecords, numRecords, true);
   }
 
   @Test
@@ -246,13 +247,13 @@ public class PartitionSortedBufferSuiteJ {
     int bufferPoolSize = 10;
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(bufferPoolSize, bufferSize, 1);
+    DataBuffer dataBuffer = createDataBuffer(bufferPoolSize, bufferSize, 1);
     // append should fail for insufficient capacity
-    appendAndCheckResult(sortBuffer, bufferPoolSize * bufferSize, false, 0, 0, 
false);
+    appendAndCheckResult(dataBuffer, bufferPoolSize * bufferSize, false, 0, 0, 
false);
   }
 
   private void appendAndCheckResult(
-      SortBuffer sortBuffer,
+      DataBuffer dataBuffer,
       int recordSize,
       boolean isSuccessful,
       long numBytes,
@@ -261,54 +262,54 @@ public class PartitionSortedBufferSuiteJ {
       throws IOException {
     ByteBuffer largeRecord = ByteBuffer.allocate(recordSize);
 
-    assertEquals(isSuccessful, sortBuffer.append(largeRecord, 0, 
Buffer.DataType.DATA_BUFFER));
-    assertEquals(numBytes, sortBuffer.numBytes());
-    assertEquals(numRecords, sortBuffer.numRecords());
-    assertEquals(hasRemaining, sortBuffer.hasRemaining());
+    assertEquals(isSuccessful, dataBuffer.append(largeRecord, 0, 
Buffer.DataType.DATA_BUFFER));
+    assertEquals(numBytes, dataBuffer.numTotalBytes());
+    assertEquals(numRecords, dataBuffer.numTotalRecords());
+    assertEquals(hasRemaining, dataBuffer.hasRemaining());
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testReadUnfinishedSortBuffer() throws Exception {
+  public void testReadUnfinishedDataBuffer() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
-    sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+    dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
 
-    assertTrue(sortBuffer.hasRemaining());
-    sortBuffer.copyIntoSegment(
+    assertTrue(dataBuffer.hasRemaining());
+    dataBuffer.getNextBuffer(
         MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore -> 
{}, 0);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testReadReleasedSortBuffer() throws Exception {
+  public void testReadReleasedDataBuffer() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
-    sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
-    sortBuffer.finish();
-    assertTrue(sortBuffer.hasRemaining());
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+    dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
+    dataBuffer.finish();
+    assertTrue(dataBuffer.hasRemaining());
 
-    sortBuffer.release();
-    assertFalse(sortBuffer.hasRemaining());
+    dataBuffer.release();
+    assertFalse(dataBuffer.hasRemaining());
 
-    sortBuffer.copyIntoSegment(
+    dataBuffer.getNextBuffer(
         MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore -> 
{}, 0);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testReadEmptySortBuffer() throws Exception {
+  public void testReadEmptyDataBuffer() throws Exception {
     int bufferSize = 1024;
 
-    SortBuffer sortBuffer = createSortBuffer(1, bufferSize, 1);
-    sortBuffer.finish();
+    DataBuffer dataBuffer = createDataBuffer(1, bufferSize, 1);
+    dataBuffer.finish();
 
-    assertFalse(sortBuffer.hasRemaining());
-    sortBuffer.copyIntoSegment(
+    assertFalse(dataBuffer.hasRemaining());
+    dataBuffer.getNextBuffer(
         MemorySegmentFactory.allocateUnpooledSegment(bufferSize), ignore -> 
{}, 0);
   }
 
   @Test
-  public void testReleaseSortBuffer() throws Exception {
+  public void testReleaseDataBuffer() throws Exception {
     int bufferPoolSize = 10;
     int bufferSize = 1024;
     int recordSize = (bufferPoolSize - 1) * bufferSize;
@@ -316,34 +317,34 @@ public class PartitionSortedBufferSuiteJ {
     NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, 
bufferSize);
     BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, 
bufferPoolSize);
 
-    SortBuffer sortBuffer = new PartitionSortedBuffer(bufferPool, 1, 
bufferSize, null);
-    sortBuffer.append(ByteBuffer.allocate(recordSize), 0, 
Buffer.DataType.DATA_BUFFER);
+    DataBuffer dataBuffer = new SortBasedDataBuffer(bufferPool, 1, bufferSize, 
null);
+    dataBuffer.append(ByteBuffer.allocate(recordSize), 0, 
Buffer.DataType.DATA_BUFFER);
 
     assertEquals(bufferPoolSize, bufferPool.bestEffortGetNumOfUsedBuffers());
-    assertTrue(sortBuffer.hasRemaining());
-    assertEquals(1, sortBuffer.numRecords());
-    assertEquals(recordSize, sortBuffer.numBytes());
+    assertTrue(dataBuffer.hasRemaining());
+    assertEquals(1, dataBuffer.numTotalRecords());
+    assertEquals(recordSize, dataBuffer.numTotalBytes());
 
     // should release all data and resources
-    sortBuffer.release();
+    dataBuffer.release();
     assertEquals(0, bufferPool.bestEffortGetNumOfUsedBuffers());
-    assertFalse(sortBuffer.hasRemaining());
-    assertEquals(0, sortBuffer.numRecords());
-    assertEquals(0, sortBuffer.numBytes());
+    assertFalse(dataBuffer.hasRemaining());
+    assertEquals(0, dataBuffer.numTotalRecords());
+    assertEquals(0, dataBuffer.numTotalBytes());
   }
 
-  private SortBuffer createSortBuffer(int bufferPoolSize, int bufferSize, int 
numSubpartitions)
+  private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int 
numSubpartitions)
       throws IOException {
-    return createSortBuffer(bufferPoolSize, bufferSize, numSubpartitions, 
null);
+    return createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, 
null);
   }
 
-  private SortBuffer createSortBuffer(
+  private DataBuffer createDataBuffer(
       int bufferPoolSize, int bufferSize, int numSubpartitions, int[] 
customReadOrder)
       throws IOException {
     NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, 
bufferSize);
     BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, 
bufferPoolSize);
 
-    return new PartitionSortedBuffer(bufferPool, numSubpartitions, bufferSize, 
customReadOrder);
+    return new SortBasedDataBuffer(bufferPool, numSubpartitions, bufferSize, 
customReadOrder);
   }
 
   public static int[] getRandomSubpartitionOrder(int numSubpartitions) {
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index d28e78285..fa8b8692a 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -40,16 +40,16 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
- * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link 
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be 
copied and spilled to the
  * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
- * appended to an empty {@link 
org.apache.flink.runtime.io.network.partition.SortBuffer} will be
- * spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
  */
 public class RemoteShuffleResultPartition extends ResultPartition {
 
@@ -81,10 +81,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
     delegation =
         new RemoteShuffleResultPartitionDelegation(
-            networkBufferSize,
-            outputGate,
-            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
-            numSubpartitions);
+            networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
   }
 
   @Override
@@ -92,10 +89,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     super.setup();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
-        bufferPool,
-        bufferCompressor,
-        buffer -> canBeCompressed(buffer),
-        () -> checkInProduceState());
+        bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
   }
 
   @Override
@@ -129,7 +123,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   @Override
   public synchronized void close() {
-    delegation.close(() -> super.close());
+    delegation.close(super::close);
   }
 
   @Override
@@ -199,11 +193,10 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     return delegation;
   }
 
-  public void updateStatistics(
-      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+  public void updateStatistics(BufferWithSubpartition bufferWithSubpartition, 
boolean isBroadcast) {
     numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
     long readableBytes =
-        (long) bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+        (long) bufferWithSubpartition.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
     numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
   }
 }
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
--- 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
     doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
     doNothing().when(remoteShuffleOutputGate).regionFinish();
     
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    DataBuffer dataBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+    dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer, 
true);
   }
 
   private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
diff --git 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 3b25a4a0a..3e35b788f 100644
--- 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -36,15 +36,16 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
- * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link 
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be 
copied and spilled to the
  * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
  */
 public class RemoteShuffleResultPartition extends ResultPartition {
 
@@ -76,10 +77,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
     delegation =
         new RemoteShuffleResultPartitionDelegation(
-            networkBufferSize,
-            outputGate,
-            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
-            numSubpartitions);
+            networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
   }
 
   @Override
@@ -87,10 +85,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     super.setup();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
-        bufferPool,
-        bufferCompressor,
-        buffer -> canBeCompressed(buffer),
-        () -> checkInProduceState());
+        bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
   }
 
   @Override
@@ -124,7 +119,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   @Override
   public synchronized void close() {
-    delegation.close(() -> super.close());
+    delegation.close(super::close);
   }
 
   @Override
@@ -199,11 +194,10 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     return delegation;
   }
 
-  public void updateStatistics(
-      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+  public void updateStatistics(BufferWithSubpartition bufferWithSubpartition, 
boolean isBroadcast) {
     numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
     long readableBytes =
-        (long) bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+        (long) bufferWithSubpartition.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
     numBytesProduced.inc(readableBytes);
     numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
   }
diff --git 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
--- 
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
     doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
     doNothing().when(remoteShuffleOutputGate).regionFinish();
     
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    DataBuffer dataBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+    dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer, 
true);
   }
 
   private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
- * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link 
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be 
copied and spilled to the
  * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
  */
 public class RemoteShuffleResultPartition extends ResultPartition {
 
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
     delegation =
         new RemoteShuffleResultPartitionDelegation(
-            networkBufferSize,
-            outputGate,
-            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
-            numSubpartitions);
+            networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
   }
 
   @Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     super.setup();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
-        bufferPool,
-        bufferCompressor,
-        buffer -> canBeCompressed(buffer),
-        () -> checkInProduceState());
+        bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
   }
 
   @Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   @Override
   public synchronized void close() {
-    delegation.close(() -> super.close());
+    delegation.close(super::close);
   }
 
   @Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     return delegation;
   }
 
-  public void updateStatistics(
-      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+  public void updateStatistics(BufferWithSubpartition bufferWithSubpartition, 
boolean isBroadcast) {
     numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
     long readableBytes =
-        (long) bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+        (long) bufferWithSubpartition.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
     if (isBroadcast) {
       resultPartitionBytes.incAll(readableBytes);
     } else {
-      resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(), 
readableBytes);
+      resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(), 
readableBytes);
     }
     numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
   }
diff --git 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
--- 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
     doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
     doNothing().when(remoteShuffleOutputGate).regionFinish();
     
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    DataBuffer dataBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+    dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer, 
true);
   }
 
   private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
- * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link 
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be 
copied and spilled to the
  * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
  */
 public class RemoteShuffleResultPartition extends ResultPartition {
 
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
     delegation =
         new RemoteShuffleResultPartitionDelegation(
-            networkBufferSize,
-            outputGate,
-            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
-            numSubpartitions);
+            networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
   }
 
   @Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     super.setup();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
-        bufferPool,
-        bufferCompressor,
-        buffer -> canBeCompressed(buffer),
-        () -> checkInProduceState());
+        bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
   }
 
   @Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   @Override
   public synchronized void close() {
-    delegation.close(() -> super.close());
+    delegation.close(super::close);
   }
 
   @Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     return delegation;
   }
 
-  public void updateStatistics(
-      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+  public void updateStatistics(BufferWithSubpartition bufferWithSubpartition, 
boolean isBroadcast) {
     numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
     long readableBytes =
-        (long) bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+        (long) bufferWithSubpartition.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
     if (isBroadcast) {
       resultPartitionBytes.incAll(readableBytes);
     } else {
-      resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(), 
readableBytes);
+      resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(), 
readableBytes);
     }
     numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
   }
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 34bd3bab4..1a96a9bb6 100644
--- 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
     doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
     doNothing().when(remoteShuffleOutputGate).regionFinish();
     
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    DataBuffer dataBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+    dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer, 
true);
   }
 
   private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index c888b6ee9..37ff33903 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -37,15 +37,16 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
- * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
- * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * A {@link ResultPartition} which appends records and events to {@link 
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be 
copied and spilled to the
  * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
- * appended to an empty {@link SortBuffer} will be spilled directly.
+ * appended to an empty {@link DataBuffer} will be spilled directly.
  */
 public class RemoteShuffleResultPartition extends ResultPartition {
 
@@ -77,10 +78,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
     delegation =
         new RemoteShuffleResultPartitionDelegation(
-            networkBufferSize,
-            outputGate,
-            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
-            numSubpartitions);
+            networkBufferSize, outputGate, this::updateStatistics, 
numSubpartitions);
   }
 
   @Override
@@ -88,10 +86,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     super.setup();
     BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
     delegation.setup(
-        bufferPool,
-        bufferCompressor,
-        buffer -> canBeCompressed(buffer),
-        () -> checkInProduceState());
+        bufferPool, bufferCompressor, this::canBeCompressed, 
this::checkInProduceState);
   }
 
   @Override
@@ -136,7 +131,7 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
 
   @Override
   public synchronized void close() {
-    delegation.close(() -> super.close());
+    delegation.close(super::close);
   }
 
   @Override
@@ -211,15 +206,14 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
     return delegation;
   }
 
-  public void updateStatistics(
-      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+  public void updateStatistics(BufferWithSubpartition bufferWithSubpartition, 
boolean isBroadcast) {
     numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
     long readableBytes =
-        (long) bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+        (long) bufferWithSubpartition.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
     if (isBroadcast) {
       resultPartitionBytes.incAll(readableBytes);
     } else {
-      resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(), 
readableBytes);
+      resultPartitionBytes.inc(bufferWithSubpartition.getSubpartitionIndex(), 
readableBytes);
     }
     numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
   }
diff --git 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index df3c25191..dffb6239e 100644
--- 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -62,7 +62,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
 import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
@@ -129,10 +129,10 @@ public class RemoteShuffleResultPartitionSuiteJ {
     doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
     doNothing().when(remoteShuffleOutputGate).regionFinish();
     
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    DataBuffer dataBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+    dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer, 
true);
   }
 
   private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {

Reply via email to