This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 270afaa78 [MINOR] improve(server): Add debug log when cacheShuffleData 
(#2156)
270afaa78 is described below

commit 270afaa78453ac161078f84f29b1010de6776e99
Author: maobaolong <[email protected]>
AuthorDate: Sat Oct 12 14:17:00 2024 +0800

    [MINOR] improve(server): Add debug log when cacheShuffleData (#2156)
    
    ### What changes were proposed in this pull request?
    
    Add debug log when cacheShuffleData
    
    ### Why are the changes needed?
    
    Supply a way to know the cacheShuffleData detail, include the partitionId, 
blockCount, we can know well from this debug level log.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No need.
---
 .../uniffle/common/ShufflePartitionedBlock.java    | 38 ++++++++++++++--------
 .../uniffle/common/ShufflePartitionedData.java     | 29 +++++++++++------
 .../common/ShufflePartitionedBlockTest.java        |  6 ++--
 .../uniffle/server/ShuffleServerGrpcService.java   | 29 ++++++++---------
 .../apache/uniffle/server/ShuffleTaskManager.java  |  4 +--
 .../server/buffer/AbstractShuffleBuffer.java       |  8 ++---
 .../server/buffer/ShuffleBufferManager.java        |  9 ++++-
 .../server/buffer/ShuffleBufferWithLinkedList.java | 10 +++---
 .../server/buffer/ShuffleBufferWithSkipList.java   | 10 +++---
 .../server/netty/ShuffleServerNettyHandler.java    | 31 ++++++++++--------
 .../server/storage/SingleStorageManager.java       |  2 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     |  6 ++--
 .../buffer/ShuffleBufferWithLinkedListTest.java    | 10 +++---
 .../handler/impl/HadoopShuffleWriteHandler.java    |  2 +-
 .../handler/impl/LocalFileWriteHandler.java        |  2 +-
 .../storage/HadoopShuffleHandlerTestBase.java      |  4 +--
 .../handler/impl/HadoopShuffleReadHandlerTest.java |  2 +-
 .../impl/LocalFileServerReadHandlerTest.java       |  4 +--
 18 files changed, 117 insertions(+), 89 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 1ce68b6b6..e476e37f3 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -24,7 +24,7 @@ import io.netty.buffer.Unpooled;
 
 public class ShufflePartitionedBlock {
 
-  private int length;
+  private int dataLength;
   private long crc;
   private long blockId;
   private int uncompressLength;
@@ -32,8 +32,13 @@ public class ShufflePartitionedBlock {
   private long taskAttemptId;
 
   public ShufflePartitionedBlock(
-      int length, int uncompressLength, long crc, long blockId, long 
taskAttemptId, byte[] data) {
-    this.length = length;
+      int dataLength,
+      int uncompressLength,
+      long crc,
+      long blockId,
+      long taskAttemptId,
+      byte[] data) {
+    this.dataLength = dataLength;
     this.crc = crc;
     this.blockId = blockId;
     this.uncompressLength = uncompressLength;
@@ -42,8 +47,13 @@ public class ShufflePartitionedBlock {
   }
 
   public ShufflePartitionedBlock(
-      int length, int uncompressLength, long crc, long blockId, long 
taskAttemptId, ByteBuf data) {
-    this.length = length;
+      int dataLength,
+      int uncompressLength,
+      long crc,
+      long blockId,
+      long taskAttemptId,
+      ByteBuf data) {
+    this.dataLength = dataLength;
     this.crc = crc;
     this.blockId = blockId;
     this.uncompressLength = uncompressLength;
@@ -53,8 +63,8 @@ public class ShufflePartitionedBlock {
 
   // calculate the data size for this block in memory including metadata which 
are
   // blockId, crc, taskAttemptId, length, uncompressLength
-  public long getSize() {
-    return length + 3 * 8 + 2 * 4;
+  public long getEncodedLength() {
+    return dataLength + 3 * 8 + 2 * 4;
   }
 
   @Override
@@ -66,7 +76,7 @@ public class ShufflePartitionedBlock {
       return false;
     }
     ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
-    return length == that.length
+    return dataLength == that.dataLength
         && crc == that.crc
         && blockId == that.blockId
         && data.equals(that.data);
@@ -74,15 +84,15 @@ public class ShufflePartitionedBlock {
 
   @Override
   public int hashCode() {
-    return Objects.hash(length, crc, blockId, data);
+    return Objects.hash(dataLength, crc, blockId, data);
   }
 
-  public int getLength() {
-    return length;
+  public int getDataLength() {
+    return dataLength;
   }
 
-  public void setLength(int length) {
-    this.length = length;
+  public void setDataLength(int dataLength) {
+    this.dataLength = dataLength;
   }
 
   public long getCrc() {
@@ -126,7 +136,7 @@ public class ShufflePartitionedBlock {
     return "ShufflePartitionedBlock{blockId["
         + blockId
         + "], length["
-        + length
+        + dataLength
         + "], uncompressLength["
         + uncompressLength
         + "], crc["
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index 6793eccd0..fbc3c2797 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.common;
 import java.util.Arrays;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
 
 public class ShufflePartitionedData {
 
@@ -28,23 +27,29 @@ public class ShufflePartitionedData {
       new ShufflePartitionedBlock[] {};
   private int partitionId;
   private final ShufflePartitionedBlock[] blockList;
-  private final long totalBlockSize;
+  private final long totalBlockEncodedLength;
+  private final long totalBlockDataLength;
 
-  public ShufflePartitionedData(int partitionId, Pair<Long, 
ShufflePartitionedBlock[]> pair) {
+  public ShufflePartitionedData(
+      int partitionId, long encodedLength, long dataLength, 
ShufflePartitionedBlock[] blockList) {
     this.partitionId = partitionId;
-    this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : 
pair.getRight();
-    totalBlockSize = pair.getLeft();
+    this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
+    totalBlockEncodedLength = encodedLength;
+    totalBlockDataLength = dataLength;
   }
 
   @VisibleForTesting
   public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] 
blockList) {
     this.partitionId = partitionId;
     this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
-    long size = 0L;
+    long encodedLength = 0L;
+    long dataLength = 0L;
     for (ShufflePartitionedBlock block : this.blockList) {
-      size += block.getSize();
+      encodedLength += block.getEncodedLength();
+      dataLength += block.getDataLength();
     }
-    totalBlockSize = size;
+    totalBlockEncodedLength = encodedLength;
+    totalBlockDataLength = dataLength;
   }
 
   @Override
@@ -68,7 +73,11 @@ public class ShufflePartitionedData {
     return blockList;
   }
 
-  public long getTotalBlockSize() {
-    return totalBlockSize;
+  public long getTotalBlockEncodedLength() {
+    return totalBlockEncodedLength;
+  }
+
+  public long getTotalBlockDataLength() {
+    return totalBlockDataLength;
   }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 3f894ab9e..eb79ede63 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -37,7 +37,7 @@ public class ShufflePartitionedBlockTest {
     new Random().nextBytes(buf);
 
     ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 1, 2, 3, 1, 
buf);
-    assertEquals(1, b1.getLength());
+    assertEquals(1, b1.getDataLength());
     assertEquals(2, b1.getCrc());
     assertEquals(3, b1.getBlockId());
 
@@ -73,7 +73,7 @@ public class ShufflePartitionedBlockTest {
         "ShufflePartitionedBlock{blockId["
             + b1.getBlockId()
             + "], length["
-            + b1.getLength()
+            + b1.getDataLength()
             + "], uncompressLength["
             + b1.getUncompressLength()
             + "], crc["
@@ -87,6 +87,6 @@ public class ShufflePartitionedBlockTest {
   @Test
   public void testSize() {
     ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, 
new byte[6]);
-    assertEquals(b1.getSize(), b1.getLength() + 3 * Long.BYTES + 2 * 
Integer.BYTES);
+    assertEquals(b1.getEncodedLength(), b1.getDataLength() + 3 * Long.BYTES + 
2 * Integer.BYTES);
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index c9375123f..d43d1e71f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -34,7 +34,6 @@ import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -467,10 +466,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           return;
         }
         final long start = System.currentTimeMillis();
-        List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
+        List<ShufflePartitionedData> shufflePartitionedDataList = 
toPartitionedDataList(req);
         long alreadyReleasedSize = 0;
         boolean hasFailureOccurred = false;
-        for (ShufflePartitionedData spd : shufflePartitionedData) {
+        for (ShufflePartitionedData spd : shufflePartitionedDataList) {
           String shuffleDataInfo =
               "appId["
                   + appId
@@ -496,7 +495,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 // TODO: Use ShuffleBufferWithSkipList to avoid caching block 
here.
                 shuffleServer.getShuffleMergeManager().cacheBlock(appId, 
shuffleId, spd);
               }
-              long toReleasedSize = spd.getTotalBlockSize();
+              long toReleasedSize = spd.getTotalBlockEncodedLength();
               // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
               manager.releasePreAllocatedSize(toReleasedSize);
               alreadyReleasedSize += toReleasedSize;
@@ -529,7 +528,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
             if (hasFailureOccurred) {
               shuffleServer
                   .getShuffleBufferManager()
-                  .releaseMemory(spd.getTotalBlockSize(), false, false);
+                  .releaseMemory(spd.getTotalBlockEncodedLength(), false, 
false);
             }
           }
         }
@@ -559,7 +558,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                   + "], cost "
                   + costTime
                   + " ms with "
-                  + shufflePartitionedData.size()
+                  + shufflePartitionedDataList.size()
                   + " blocks and "
                   + requireSize
                   + " bytes");
@@ -1660,24 +1659,23 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     }
   }
 
-  private List<ShufflePartitionedData> 
toPartitionedData(SendShuffleDataRequest req) {
+  private List<ShufflePartitionedData> 
toPartitionedDataList(SendShuffleDataRequest req) {
     List<ShufflePartitionedData> ret = Lists.newArrayList();
 
     for (ShuffleData data : req.getShuffleDataList()) {
-      ret.add(
-          new ShufflePartitionedData(
-              data.getPartitionId(), toPartitionedBlock(data.getBlockList())));
+      ret.add(toPartitionedData(data.getPartitionId(), data.getBlockList()));
     }
 
     return ret;
   }
 
-  private Pair<Long, ShufflePartitionedBlock[]> 
toPartitionedBlock(List<ShuffleBlock> blocks) {
+  private ShufflePartitionedData toPartitionedData(int partitionId, 
List<ShuffleBlock> blocks) {
     if (blocks == null || blocks.size() == 0) {
-      return Pair.of(0L, new ShufflePartitionedBlock[] {});
+      return new ShufflePartitionedData(partitionId, 0L, 0L, new 
ShufflePartitionedBlock[] {});
     }
     ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
-    long size = 0L;
+    long encodedLength = 0L;
+    long dataLength = 0L;
     int i = 0;
     for (ShuffleBlock block : blocks) {
       ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
@@ -1689,10 +1687,11 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               block.getBlockId(),
               block.getTaskAttemptId(),
               data);
-      size += ret[i].getSize();
+      encodedLength += ret[i].getEncodedLength();
+      dataLength += ret[i].getDataLength();
       i++;
     }
-    return Pair.of(size, ret);
+    return new ShufflePartitionedData(partitionId, encodedLength, dataLength, 
ret);
   }
 
   private Map<Integer, long[]> toPartitionBlocksMap(List<PartitionToBlockIds> 
partitionToBlockIds) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 8f7cfd4d4..e51ab5352 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -332,7 +332,7 @@ public class ShuffleTaskManager {
       String appId, int shuffleId, boolean isPreAllocated, 
ShufflePartitionedData spd) {
     refreshAppId(appId);
     long partitionSize = getPartitionDataSize(appId, shuffleId, 
spd.getPartitionId());
-    long deltaSize = spd.getTotalBlockSize();
+    long deltaSize = spd.getTotalBlockEncodedLength();
     partitionSize += deltaSize;
     // We do not need to check the huge partition size here, after old client 
upgraded to this
     // version,
@@ -506,7 +506,7 @@ public class ShuffleTaskManager {
     synchronized (bitmap) {
       for (ShufflePartitionedBlock spb : spbs) {
         bitmap.addLong(spb.getBlockId());
-        size += spb.getSize();
+        size += spb.getEncodedLength();
       }
     }
     long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, 
partitionId, size);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 62e2728bf..f520603da 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -126,7 +126,7 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
       } catch (Exception e) {
         LOG.error(
             "Unexpected exception for System.arraycopy, length["
-                + block.getLength()
+                + block.getDataLength()
                 + "], offset["
                 + offset
                 + "], dataLength["
@@ -135,7 +135,7 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
             e);
         throw e;
       }
-      offset += block.getLength();
+      offset += block.getDataLength();
     }
   }
 
@@ -168,13 +168,13 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
           new BufferSegment(
               block.getBlockId(),
               currentOffset,
-              block.getLength(),
+              block.getDataLength(),
               block.getUncompressLength(),
               block.getCrc(),
               block.getTaskAttemptId()));
       readBlocks.add(block);
       // update offset
-      currentOffset += block.getLength();
+      currentOffset += block.getDataLength();
       // check if length >= request buffer size
       if (currentOffset >= readBufferSize) {
         break;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 18dd94c19..aac9be42e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -226,10 +226,17 @@ public class ShuffleBufferManager {
       Arrays.stream(spd.getBlockList())
           .forEach(
               b -> {
-                int blockSize = b.getLength();
+                int blockSize = b.getDataLength();
                 
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
               });
     }
+    LOG.debug(
+        "cache shuffle data, size: {}, blockCount: {}, appId: {}, shuffleId: 
{}, partitionId: {}",
+        spd.getTotalBlockDataLength(),
+        spd.getBlockList().length,
+        appId,
+        shuffleId,
+        spd.getPartitionId());
     updateShuffleSize(appId, shuffleId, size);
     synchronized (this) {
       flushSingleBufferIfNecessary(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 6f94d7a50..9597cbfe3 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -58,7 +58,7 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
         // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
         // block would gc without release. Here we must release the duplicated 
block.
         if (blocks.add(block)) {
-          size += block.getSize();
+          size += block.getEncodedLength();
         } else {
           block.getData().release();
         }
@@ -126,10 +126,10 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     for (ShufflePartitionedBlock spb : blocks) {
       try {
         spb.getData().release();
-        releasedSize += spb.getSize();
+        releasedSize += spb.getEncodedLength();
       } catch (Throwable t) {
         lastException = t;
-        failedToReleaseSize += spb.getSize();
+        failedToReleaseSize += spb.getEncodedLength();
       }
     }
     if (lastException != null) {
@@ -261,13 +261,13 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
           new BufferSegment(
               block.getBlockId(),
               currentOffset,
-              block.getLength(),
+              block.getDataLength(),
               block.getUncompressLength(),
               block.getCrc(),
               block.getTaskAttemptId()));
       readBlocks.add(block);
       // update offset
-      currentOffset += block.getLength();
+      currentOffset += block.getDataLength();
       if (currentOffset >= readBufferSize) {
         break;
       }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d8ed3b2db..50e6d686f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -68,7 +68,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
         if (!blocksMap.containsKey(block.getBlockId())) {
           blocksMap.put(block.getBlockId(), block);
           blockCount++;
-          size += block.getSize();
+          size += block.getEncodedLength();
         } else {
           block.getData().release();
         }
@@ -127,10 +127,10 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     for (ShufflePartitionedBlock spb : blocksMap.values()) {
       try {
         spb.getData().release();
-        releasedSize += spb.getSize();
+        releasedSize += spb.getEncodedLength();
       } catch (Throwable t) {
         lastException = t;
-        failedToReleaseSize += spb.getSize();
+        failedToReleaseSize += spb.getEncodedLength();
       }
     }
     if (lastException != null) {
@@ -249,13 +249,13 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
           new BufferSegment(
               block.getBlockId(),
               currentOffset,
-              block.getLength(),
+              block.getDataLength(),
               block.getUncompressLength(),
               block.getCrc(),
               block.getTaskAttemptId()));
       readBlocks.add(block);
       // update offset
-      currentOffset += block.getLength();
+      currentOffset += block.getDataLength();
       if (currentOffset >= readBufferSize) {
         break;
       }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index e40bf4d87..3c7bb736f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -30,7 +30,6 @@ import io.netty.channel.ChannelFutureListener;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,7 +166,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               + ", requireBlocksSize="
               + requireBlocksSize
               + ", stageAttemptNumber="
-              + stageAttemptNumber);
+              + stageAttemptNumber
+              + ", partitionCount="
+              + req.getPartitionToBlocks().size());
 
       ShuffleTaskInfo taskInfo = 
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
       if (taskInfo == null) {
@@ -274,10 +275,10 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         }
         final long start = System.currentTimeMillis();
         shuffleBufferManager.releaseMemory(req.encodedLength(), false, true);
-        List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
+        List<ShufflePartitionedData> shufflePartitionedDataList = 
toPartitionedDataList(req);
         long alreadyReleasedSize = 0;
         boolean hasFailureOccurred = false;
-        for (ShufflePartitionedData spd : shufflePartitionedData) {
+        for (ShufflePartitionedData spd : shufflePartitionedDataList) {
           String shuffleDataInfo =
               "appId["
                   + appId
@@ -301,7 +302,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               responseMessage = errorMsg;
               hasFailureOccurred = true;
             } else {
-              long toReleasedSize = spd.getTotalBlockSize();
+              long toReleasedSize = spd.getTotalBlockEncodedLength();
               // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
               shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
               alreadyReleasedSize += toReleasedSize;
@@ -333,7 +334,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             // Once the cache failure occurs, we should explicitly release 
data held by byteBuf
             if (hasFailureOccurred) {
               Arrays.stream(spd.getBlockList()).forEach(block -> 
block.getData().release());
-              shuffleBufferManager.releaseMemory(spd.getTotalBlockSize(), 
false, false);
+              
shuffleBufferManager.releaseMemory(spd.getTotalBlockEncodedLength(), false, 
false);
             }
           }
         }
@@ -359,7 +360,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                   + "], cost "
                   + costTime
                   + " ms with "
-                  + shufflePartitionedData.size()
+                  + shufflePartitionedDataList.size()
                   + " blocks and "
                   + requireBlocksSize
                   + " bytes");
@@ -750,21 +751,22 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     }
   }
 
-  private List<ShufflePartitionedData> 
toPartitionedData(SendShuffleDataRequest req) {
+  private List<ShufflePartitionedData> 
toPartitionedDataList(SendShuffleDataRequest req) {
     List<ShufflePartitionedData> ret = Lists.newArrayList();
 
     for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : 
req.getPartitionToBlocks().entrySet()) {
-      ret.add(new ShufflePartitionedData(entry.getKey(), 
toPartitionedBlock(entry.getValue())));
+      ret.add(toPartitionedData(entry.getKey(), entry.getValue()));
     }
     return ret;
   }
 
-  private Pair<Long, ShufflePartitionedBlock[]> 
toPartitionedBlock(List<ShuffleBlockInfo> blocks) {
+  private ShufflePartitionedData toPartitionedData(int partitionId, 
List<ShuffleBlockInfo> blocks) {
     if (blocks == null || blocks.size() == 0) {
-      return Pair.of(0L, new ShufflePartitionedBlock[] {});
+      return new ShufflePartitionedData(partitionId, 0L, 0L, new 
ShufflePartitionedBlock[] {});
     }
     ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
-    long size = 0L;
+    long encodedLength = 0L;
+    long dataLength = 0L;
     int i = 0;
     for (ShuffleBlockInfo block : blocks) {
       ret[i] =
@@ -775,10 +777,11 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               block.getBlockId(),
               block.getTaskAttemptId(),
               block.getData());
-      size += ret[i].getSize();
+      encodedLength += ret[i].getEncodedLength();
+      dataLength += ret[i].getDataLength();
       i++;
     }
-    return Pair.of(size, ret);
+    return new ShufflePartitionedData(partitionId, encodedLength, dataLength, 
ret);
   }
 
   private StatusCode verifyRequest(String appId) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index fb974d5f5..936a7550f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -137,7 +137,7 @@ public abstract class SingleStorageManager implements 
StorageManager {
     long length = 0;
     long blockNum = 0;
     for (ShufflePartitionedBlock block : event.getShuffleBlocks()) {
-      length += block.getLength();
+      length += block.getDataLength();
       blockNum++;
     }
     List<Integer> partitions = Lists.newArrayList();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index a0c06f541..2a67c548e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -295,14 +295,14 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
 
     // case1
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
-    long size1 = partitionedData0.getTotalBlockSize();
+    long size1 = partitionedData0.getTotalBlockEncodedLength();
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
 
     assertEquals(size1, 
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
 
     // case2
     partitionedData0 = createPartitionedData(1, 1, 35);
-    long size2 = partitionedData0.getTotalBlockSize();
+    long size2 = partitionedData0.getTotalBlockEncodedLength();
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
     assertEquals(size1 + size2, 
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
     assertEquals(
@@ -1159,7 +1159,7 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     for (ShufflePartitionedBlock block : blocks) {
       for (BufferSegment bs : bufferSegments) {
         if (bs.getBlockId() == block.getBlockId()) {
-          assertEquals(block.getLength(), bs.getLength());
+          assertEquals(block.getDataLength(), bs.getLength());
           assertEquals(block.getCrc(), bs.getCrc());
           matchNum++;
           break;
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index f2037b58f..151232af2 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -599,14 +599,14 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
   private byte[] getExpectedData(ShufflePartitionedData... spds) {
     int size = 0;
     for (ShufflePartitionedData spd : spds) {
-      size += spd.getBlockList()[0].getLength();
+      size += spd.getBlockList()[0].getDataLength();
     }
     byte[] expectedData = new byte[size];
     int offset = 0;
     for (ShufflePartitionedData spd : spds) {
       ShufflePartitionedBlock block = spd.getBlockList()[0];
-      ByteBufUtils.readBytes(block.getData(), expectedData, offset, 
block.getLength());
-      offset += block.getLength();
+      ByteBufUtils.readBytes(block.getData(), expectedData, offset, 
block.getDataLength());
+      offset += block.getDataLength();
     }
     return expectedData;
   }
@@ -623,10 +623,10 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
       ShufflePartitionedBlock spb = blocks.get(i);
       BufferSegment segment = bufferSegments.get(segmentIndex);
       assertEquals(spb.getBlockId(), segment.getBlockId());
-      assertEquals(spb.getLength(), segment.getLength());
+      assertEquals(spb.getDataLength(), segment.getLength());
       assertEquals(spb.getCrc(), segment.getCrc());
       assertEquals(offset, segment.getOffset());
-      offset += spb.getLength();
+      offset += spb.getDataLength();
       segmentIndex++;
     }
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index eb81c4089..d6339975a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -132,7 +132,7 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
               new FileBasedShuffleSegment(
                   blockId,
                   startOffset,
-                  block.getLength(),
+                  block.getDataLength(),
                   block.getUncompressLength(),
                   crc,
                   block.getTaskAttemptId());
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 4b06e5aa9..a00e33d2f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -109,7 +109,7 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
             new FileBasedShuffleSegment(
                 blockId,
                 startOffset,
-                block.getLength(),
+                block.getDataLength(),
                 block.getUncompressLength(),
                 crc,
                 block.getTaskAttemptId());
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
 
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index ea29a4768..1e936b404 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -96,11 +96,11 @@ public class HadoopShuffleHandlerTestBase {
           new FileBasedShuffleSegment(
               spb.getBlockId(),
               offset,
-              spb.getLength(),
+              spb.getDataLength(),
               spb.getUncompressLength(),
               spb.getCrc(),
               1);
-      offset += spb.getLength();
+      offset += spb.getDataLength();
       segments.add(segment);
       if (doWrite) {
         writer.writeData(ByteBufUtils.readBytes(spb.getData()));
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index d1b663f1f..1b2b7e93b 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -212,7 +212,7 @@ public class HadoopShuffleReadHandlerTest extends 
HadoopTestBase {
                 new FileBasedShuffleSegment(
                     blockId,
                     startOffset,
-                    block.getLength(),
+                    block.getDataLength(),
                     block.getUncompressLength(),
                     crc,
                     block.getTaskAttemptId());
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index 2a55ae4de..5b2db2daf 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -65,11 +65,11 @@ public class LocalFileServerReadHandlerTest {
                 new FileBasedShuffleSegment(
                     block.getBlockId(),
                     offset,
-                    block.getLength(),
+                    block.getDataLength(),
                     block.getUncompressLength(),
                     block.getCrc(),
                     block.getTaskAttemptId());
-            offset += block.getLength();
+            offset += block.getDataLength();
             LocalFileHandlerTestBase.writeIndex(byteBuffer, segment);
           }
         },


Reply via email to