TanYuxin-tyx commented on code in PR #2448:
URL: https://github.com/apache/celeborn/pull/2448#discussion_r1555357757


##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java:
##########
@@ -129,14 +130,14 @@ public PartitionSortedBuffer(
       checkArgument(customReadOrder.length == numSubpartitions, "Illegal data 
read order.");
       System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, 
numSubpartitions);
     } else {
-      for (int channel = 0; channel < numSubpartitions; ++channel) {
-        this.subpartitionReadOrder[channel] = channel;
+      for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+        this.subpartitionReadOrder[subpartition] = subpartition;
       }
     }
   }
 
   @Override
-  public boolean append(ByteBuffer source, int targetChannel, DataType 
dataType)
+  public boolean append(ByteBuffer source, int targetSubpartition, DataType 
dataType)
       throws IOException {
     checkArgument(source.hasRemaining(), "Cannot append empty data.");
     checkState(!isFinished, "Sort buffer is already finished.");

Review Comment:
   ditto



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java:
##########
@@ -246,17 +247,17 @@ public void broadcast(ByteBuffer record, Buffer.DataType 
dataType) throws IOExce
     emit(record, 0, dataType, true);
   }
 
-  public void releaseSortBuffer(SortBuffer sortBuffer) {
-    if (sortBuffer != null) {
-      sortBuffer.release();
+  public void releaseDataBuffer(DataBuffer dataBuffer) {
+    if (dataBuffer != null) {
+      dataBuffer.release();
     }
   }
 
   public void finish() throws IOException {
     Utils.checkState(
-        unicastSortBuffer == null || unicastSortBuffer.isReleased(),
+        unicastDataBuffer == null || unicastDataBuffer.isReleased(),
         "The unicast sort buffer should be either null or released.");

Review Comment:
   `data buffer` here



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java:
##########
@@ -262,15 +263,15 @@ private void updateWriteSegmentIndexAndOffset(int 
numBytes) {
   }
 
   @Override
-  public BufferWithChannel copyIntoSegment(
-      MemorySegment target, BufferRecycler recycler, int offset) {
+  public BufferWithSubpartition getNextBuffer(
+      MemorySegment transitBuffer, BufferRecycler recycler, int offset) {
     checkState(hasRemaining(), "No data remaining.");
     checkState(isFinished, "Should finish the sort buffer first before coping 
any data.");

Review Comment:
   Many other places in the comments show `sort buffer` or `Sort buffer`, we 
can fix them together.



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java:
##########
@@ -127,70 +128,70 @@ public void emit(
   }
 
   @VisibleForTesting
-  public SortBuffer getUnicastSortBuffer() throws IOException {
-    flushBroadcastSortBuffer();
+  public DataBuffer getUnicastDataBuffer() throws IOException {
+    flushBroadcastDataBuffer();
 
-    if (unicastSortBuffer != null && !unicastSortBuffer.isFinished()) {
-      return unicastSortBuffer;
+    if (unicastDataBuffer != null && !unicastDataBuffer.isFinished()) {
+      return unicastDataBuffer;
     }
 
-    unicastSortBuffer =
-        new PartitionSortedBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
-    return unicastSortBuffer;
+    unicastDataBuffer =
+        new SortBasedDataBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
+    return unicastDataBuffer;
   }
 
-  public SortBuffer getBroadcastSortBuffer() throws IOException {
-    flushUnicastSortBuffer();
+  public DataBuffer getBroadcastDataBuffer() throws IOException {
+    flushUnicastDataBuffer();
 
-    if (broadcastSortBuffer != null && !broadcastSortBuffer.isFinished()) {
-      return broadcastSortBuffer;
+    if (broadcastDataBuffer != null && !broadcastDataBuffer.isFinished()) {
+      return broadcastDataBuffer;
     }
 
-    broadcastSortBuffer =
-        new PartitionSortedBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
-    return broadcastSortBuffer;
+    broadcastDataBuffer =
+        new SortBasedDataBuffer(bufferPool, numSubpartitions, 
networkBufferSize, null);
+    return broadcastDataBuffer;
   }
 
-  public void flushBroadcastSortBuffer() throws IOException {
-    flushSortBuffer(broadcastSortBuffer, true);
+  public void flushBroadcastDataBuffer() throws IOException {
+    flushDataBuffer(broadcastDataBuffer, true);
   }
 
-  public void flushUnicastSortBuffer() throws IOException {
-    flushSortBuffer(unicastSortBuffer, false);
+  public void flushUnicastDataBuffer() throws IOException {
+    flushDataBuffer(unicastDataBuffer, false);
   }
 
   @VisibleForTesting
-  void flushSortBuffer(SortBuffer sortBuffer, boolean isBroadcast) throws 
IOException {
-    if (sortBuffer == null || sortBuffer.isReleased()) {
+  void flushDataBuffer(DataBuffer dataBuffer, boolean isBroadcast) throws 
IOException {
+    if (dataBuffer == null || dataBuffer.isReleased()) {
       return;
     }
-    sortBuffer.finish();
-    if (sortBuffer.hasRemaining()) {
+    dataBuffer.finish();
+    if (dataBuffer.hasRemaining()) {
       try {
         outputGate.regionStart(isBroadcast);
-        while (sortBuffer.hasRemaining()) {
+        while (dataBuffer.hasRemaining()) {
           MemorySegment segment = 
outputGate.getBufferPool().requestMemorySegmentBlocking();
-          SortBuffer.BufferWithChannel bufferWithChannel;
+          BufferWithSubpartition bufferWithSubpartition;
           try {
-            bufferWithChannel =
-                sortBuffer.copyIntoSegment(
+            bufferWithSubpartition =
+                dataBuffer.getNextBuffer(
                     segment, outputGate.getBufferPool(), 
BufferUtils.HEADER_LENGTH);
           } catch (Throwable t) {
             outputGate.getBufferPool().recycle(segment);
             throw new FlinkRuntimeException("Shuffle write failure.", t);
           }
 
-          Buffer buffer = bufferWithChannel.getBuffer();
-          int subpartitionIndex = bufferWithChannel.getChannelIndex();
-          statisticsConsumer.accept(bufferWithChannel, isBroadcast);
+          Buffer buffer = bufferWithSubpartition.getBuffer();
+          int subpartitionIndex = 
bufferWithSubpartition.getSubpartitionIndex();
+          statisticsConsumer.accept(bufferWithSubpartition, isBroadcast);
           writeCompressedBufferIfPossible(buffer, subpartitionIndex);
         }
         outputGate.regionFinish();
       } catch (InterruptedException e) {
         throw new IOException("Failed to flush the sort buffer, broadcast=" + 
isBroadcast, e);

Review Comment:
   `data buffer`



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java:
##########
@@ -129,14 +130,14 @@ public PartitionSortedBuffer(
       checkArgument(customReadOrder.length == numSubpartitions, "Illegal data 
read order.");
       System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, 
numSubpartitions);
     } else {
-      for (int channel = 0; channel < numSubpartitions; ++channel) {
-        this.subpartitionReadOrder[channel] = channel;
+      for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+        this.subpartitionReadOrder[subpartition] = subpartition;
       }
     }
   }
 
   @Override
-  public boolean append(ByteBuffer source, int targetChannel, DataType 
dataType)
+  public boolean append(ByteBuffer source, int targetSubpartition, DataType 
dataType)
       throws IOException {
     checkArgument(source.hasRemaining(), "Cannot append empty data.");

Review Comment:
   Here the message should also be renamed.



##########
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java:
##########
@@ -105,20 +106,20 @@ public void emit(
           targetSubpartition == 0, "Target subpartition index can only be 0 
when broadcast.");
     }
 
-    SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : 
getUnicastSortBuffer();
-    if (sortBuffer.append(record, targetSubpartition, dataType)) {
+    DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() : 
getUnicastDataBuffer();
+    if (dataBuffer.append(record, targetSubpartition, dataType)) {
       return;
     }
 
     try {
-      if (!sortBuffer.hasRemaining()) {
+      if (!dataBuffer.hasRemaining()) {
         // the record can not be appended to the free sort buffer because it 
is too large

Review Comment:
   `data buffer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to