This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b98b48819 [#2738] feat(server): add metrics to track shuffle data 
block count and avg block size (#2741)
b98b48819 is described below

commit b98b488198da3c130fa86be6e8e9f09407f4f84b
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Mar 19 16:56:54 2026 +0800

    [#2738] feat(server): add metrics to track shuffle data block count and avg 
block size (#2741)
    
    ### What changes were proposed in this pull request?
    Add metrics to track shuffle data block count and avg block size.
    
    ### Why are the changes needed?
    Fix: #2738
    We need to identify and stop the application that generate large amounts of 
blocks when heap memory is insufficient.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    CI and manual testing
---
 .../uniffle/common/ShufflePartitionedData.java     | 18 ++++++++++
 .../uniffle/server/ShuffleDataFlushEvent.java      |  4 +++
 .../uniffle/server/ShuffleServerGrpcService.java   |  3 +-
 .../uniffle/server/ShuffleServerMetrics.java       | 26 ++++++++++++++
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 14 ++++++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 15 +++++---
 .../server/TopNShuffleDataSizeOfAppCalcTask.java   | 41 ++++++++++++++++++++++
 .../server/buffer/ShuffleBufferManager.java        | 25 +++++++++++++
 .../server/buffer/ShuffleBufferWithLinkedList.java |  6 ++++
 .../server/buffer/ShuffleBufferWithSkipList.java   |  6 ++++
 .../org/apache/uniffle/server/merge/Partition.java |  2 +-
 .../server/netty/ShuffleServerNettyHandler.java    |  3 +-
 .../server/KerberizedShuffleTaskManagerTest.java   |  4 +--
 .../uniffle/server/ShuffleTaskManagerTest.java     | 41 +++++++++++-----------
 .../server/buffer/ShuffleBufferManagerTest.java    | 10 +++---
 15 files changed, 181 insertions(+), 37 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index fbc3c2797..5a49bfd27 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -29,6 +29,8 @@ public class ShufflePartitionedData {
   private final ShufflePartitionedBlock[] blockList;
   private final long totalBlockEncodedLength;
   private final long totalBlockDataLength;
+  private int duplicateBlockCount;
+  private long duplicateBlockSize;
 
   public ShufflePartitionedData(
       int partitionId, long encodedLength, long dataLength, 
ShufflePartitionedBlock[] blockList) {
@@ -80,4 +82,20 @@ public class ShufflePartitionedData {
   public long getTotalBlockDataLength() {
     return totalBlockDataLength;
   }
+
+  public int getDuplicateBlockCount() {
+    return duplicateBlockCount;
+  }
+
+  public void setDuplicateBlockCount(int duplicateBlockCount) {
+    this.duplicateBlockCount = duplicateBlockCount;
+  }
+
+  public long getDuplicateBlockSize() {
+    return duplicateBlockSize;
+  }
+
+  public void setDuplicateBlockSize(long duplicateBlockSize) {
+    this.duplicateBlockSize = duplicateBlockSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 7823cad04..04a18df00 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -120,6 +120,10 @@ public class ShuffleDataFlushEvent {
     return dataLength;
   }
 
+  public long getBlockCount() {
+    return shuffleBlocks.size();
+  }
+
   public String getAppId() {
     return appId;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bd35750d3..b62df9c32 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -458,8 +458,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
               manager.releasePreAllocatedSize(toReleasedSize);
               alreadyReleasedSize += toReleasedSize;
-              manager.updateCachedBlockIds(
-                  appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
+              manager.updateCachedBlockIds(appId, shuffleId, 
spd.getPartitionId(), spd);
             }
           } catch (ExceedHugePartitionHardLimitException e) {
             String errorMsg =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index ced8a17a0..1f4d451e5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -99,6 +99,8 @@ public class ShuffleServerMetrics {
   private static final String ALLOCATED_BUFFER_SIZE = "allocated_buffer_size";
   private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
   private static final String USED_BUFFER_SIZE = "used_buffer_size";
+  private static final String TOTAL_IN_MEMORY_BLOCK_COUNT = 
"total_in_memory_block_count";
+  private static final String TOTAL_IN_FLUSH_BLOCK_COUNT = 
"total_in_flush_block_count";
   private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
   public static final String USED_DIRECT_MEMORY_SIZE = 
"used_direct_memory_size";
   public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = 
"used_direct_memory_size_by_netty";
@@ -158,6 +160,10 @@ public class ShuffleServerMetrics {
   public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP = 
"topN_of_total_data_size_for_app";
   public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP =
       "topN_of_in_memory_data_size_for_app";
+  public static final String TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP =
+      "topN_of_in_memory_block_count_for_app";
+  public static final String BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP =
+      "bottomN_of_in_memory_avg_block_size_for_app";
   public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP =
       "topN_of_on_localfile_data_size_for_app";
   public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
@@ -239,6 +245,8 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeAllocatedBufferSize;
   public static Gauge.Child gaugeInFlushBufferSize;
   public static Gauge.Child gaugeUsedBufferSize;
+  public static Gauge.Child gaugeTotalInMemoryBlockCount;
+  public static Gauge.Child gaugeTotalInFlushBlockCount;
   public static Gauge.Child gaugeReadBufferUsedSize;
   public static Gauge.Child gaugeWriteHandler;
   public static Gauge.Child gaugeMergeEventQueueSize;
@@ -257,6 +265,8 @@ public class ShuffleServerMetrics {
 
   public static Gauge gaugeTotalDataSizeUsage;
   public static Gauge gaugeInMemoryDataSizeUsage;
+  public static Gauge gaugeInMemoryBlockCount;
+  public static Gauge gaugeInMemoryAvgBlockSize;
   public static Gauge gaugeOnDiskDataSizeUsage;
   public static Gauge gaugeOnHadoopDataSizeUsage;
 
@@ -477,6 +487,8 @@ public class ShuffleServerMetrics {
     gaugeAllocatedBufferSize = 
metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE);
     gaugeInFlushBufferSize = 
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
     gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
+    gaugeTotalInMemoryBlockCount = 
metricsManager.addLabeledGauge(TOTAL_IN_MEMORY_BLOCK_COUNT);
+    gaugeTotalInFlushBlockCount = 
metricsManager.addLabeledGauge(TOTAL_IN_FLUSH_BLOCK_COUNT);
     gaugeReadBufferUsedSize = 
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
     gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
     gaugeMergeEventQueueSize = 
metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE);
@@ -536,6 +548,20 @@ public class ShuffleServerMetrics {
             .labelNames("app_id")
             .register(metricsManager.getCollectorRegistry());
 
+    gaugeInMemoryBlockCount =
+        Gauge.build()
+            .name(TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP)
+            .help("top N of in memory shuffle block count for app level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
+
+    gaugeInMemoryAvgBlockSize =
+        Gauge.build()
+            .name(BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP)
+            .help("bottom N of in memory shuffle average block size for app 
level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
+
     gaugeOnDiskDataSizeUsage =
         Gauge.build()
             .name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP)
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index e748a892f..820c76357 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -60,6 +60,7 @@ public class ShuffleTaskInfo {
 
   private final AtomicLong totalDataSize = new AtomicLong(0);
   private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+  private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
   private final AtomicLong onLocalFileNum = new AtomicLong(0);
   private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
   private final AtomicLong onHadoopFileNum = new AtomicLong(0);
@@ -176,6 +177,19 @@ public class ShuffleTaskInfo {
     return inMemoryDataSize.get();
   }
 
+  public long getInMemoryBlockCount() {
+    return inMemoryBlockCount.get();
+  }
+
+  public void addInMemoryBlockCount(long delta) {
+    inMemoryBlockCount.addAndGet(delta);
+  }
+
+  public long getInMemoryAvgBlockSize() {
+    long blockCount = getInMemoryBlockCount();
+    return blockCount <= 0 ? Long.MAX_VALUE : getInMemoryDataSize() / 
blockCount;
+  }
+
   public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) {
     if (isNewlyCreated) {
       onLocalFileNum.incrementAndGet();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index f2c120464..33712244b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -481,12 +481,14 @@ public class ShuffleTaskManager {
   }
 
   // Only for tests
-  public void updateCachedBlockIds(String appId, int shuffleId, 
ShufflePartitionedBlock[] spbs) {
-    updateCachedBlockIds(appId, shuffleId, 0, spbs);
+  public void updateCachedBlockIds(
+      String appId, int shuffleId, ShufflePartitionedData 
shufflePartitionedData) {
+    updateCachedBlockIds(appId, shuffleId, 0, shufflePartitionedData);
   }
 
   public void updateCachedBlockIds(
-      String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] 
spbs) {
+      String appId, int shuffleId, int partitionId, ShufflePartitionedData 
shufflePartitionedData) {
+    ShufflePartitionedBlock[] spbs = shufflePartitionedData.getBlockList();
     if (spbs == null || spbs.length == 0) {
       return;
     }
@@ -512,7 +514,12 @@ public class ShuffleTaskManager {
         size += spb.getEncodedLength();
       }
     }
-    long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, 
partitionId, size);
+    int blockCount = spbs.length - 
shufflePartitionedData.getDuplicateBlockCount();
+    shuffleBufferManager.addInMemoryBlockCount(blockCount);
+    shuffleTaskInfo.addInMemoryBlockCount(blockCount);
+    long partitionSize =
+        shuffleTaskInfo.addPartitionDataSize(
+            shuffleId, partitionId, size - 
shufflePartitionedData.getDuplicateBlockSize());
     HugePartitionUtils.markHugePartition(
         shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, 
partitionSize);
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
index 97ffb3cf4..b835c904a 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -37,6 +37,8 @@ public class TopNShuffleDataSizeOfAppCalcTask {
 
   private final Gauge gaugeTotalDataSize;
   private final Gauge gaugeInMemoryDataSize;
+  private final Gauge gaugeInMemoryBlockCount;
+  private final Gauge gaugeInMemoryAvgBlockSize;
   private final Gauge gaugeOnLocalFileDataSize;
   private final Gauge gaugeOnHadoopDataSize;
 
@@ -50,6 +52,8 @@ public class TopNShuffleDataSizeOfAppCalcTask {
     shuffleTaskManager = taskManager;
     this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage;
     this.gaugeInMemoryDataSize = 
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
+    this.gaugeInMemoryBlockCount = 
ShuffleServerMetrics.gaugeInMemoryBlockCount;
+    this.gaugeInMemoryAvgBlockSize = 
ShuffleServerMetrics.gaugeInMemoryAvgBlockSize;
     this.gaugeOnLocalFileDataSize = 
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
     this.gaugeOnHadoopDataSize = 
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
     this.scheduler =
@@ -72,6 +76,22 @@ public class TopNShuffleDataSizeOfAppCalcTask {
           .set(taskInfo.getValue().getInMemoryDataSize());
     }
 
+    topNTaskInfo = calcTopNInMemoryBlockCountTaskInfo();
+    gaugeInMemoryBlockCount.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      gaugeInMemoryBlockCount
+          .labels(taskInfo.getKey())
+          .set(taskInfo.getValue().getInMemoryBlockCount());
+    }
+
+    topNTaskInfo = calcBottomNInMemoryAvgBlockSizeTaskInfo();
+    gaugeInMemoryAvgBlockSize.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      gaugeInMemoryAvgBlockSize
+          .labels(taskInfo.getKey())
+          .set(taskInfo.getValue().getInMemoryAvgBlockSize());
+    }
+
     topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo();
     gaugeOnLocalFileDataSize.clear();
     for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
@@ -108,6 +128,27 @@ public class TopNShuffleDataSizeOfAppCalcTask {
         .collect(Collectors.toList());
   }
 
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNInMemoryBlockCountTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(
+                    e2.getValue().getInMemoryBlockCount(), 
e1.getValue().getInMemoryBlockCount()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcBottomNInMemoryAvgBlockSizeTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(
+                    e1.getValue().getInMemoryAvgBlockSize(),
+                    e2.getValue().getInMemoryAvgBlockSize()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
   public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNOnLocalFileDataSizeTaskInfo() {
     return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
         .sorted(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ed9331291..dee841898 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskInfo;
 import org.apache.uniffle.server.ShuffleTaskManager;
 import org.apache.uniffle.server.buffer.lab.ChunkCreator;
 import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
@@ -96,6 +97,8 @@ public class ShuffleBufferManager {
   protected AtomicLong inFlushSize = new AtomicLong(0L);
   protected AtomicLong usedMemory = new AtomicLong(0L);
   private AtomicLong readDataMemory = new AtomicLong(0L);
+  private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
+  private final AtomicLong inFlushBlockCount = new AtomicLong(0);
   // appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId
   protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> 
bufferPool;
   // appId -> shuffleId -> shuffle size in buffer
@@ -496,11 +499,22 @@ public class ShuffleBufferManager {
               shuffleFlushManager.getDataDistributionType(appId));
       if (event != null) {
         event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(), 
true, false));
+        event.addCleanupCallback(
+            () -> {
+              long blockCount = event.getBlockCount();
+              ShuffleTaskInfo shuffleTaskInfo = 
shuffleTaskManager.getShuffleTaskInfo(appId);
+              if (shuffleTaskInfo != null) {
+                shuffleTaskInfo.addInMemoryBlockCount(-blockCount);
+              }
+              addInMemoryBlockCount(-blockCount);
+              addInFlushBlockCount(-blockCount);
+            });
         updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
         inFlushSize.addAndGet(event.getEncodedLength());
         if (isHugePartition) {
           event.markOwnedByHugePartition();
         }
+        addInFlushBlockCount(event.getBlockCount());
         ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
         shuffleFlushManager.addToFlushQueue(event);
         return true;
@@ -524,6 +538,16 @@ public class ShuffleBufferManager {
     }
   }
 
+  public void addInMemoryBlockCount(long delta) {
+    long blockCount = inMemoryBlockCount.addAndGet(delta);
+    ShuffleServerMetrics.gaugeTotalInMemoryBlockCount.set(blockCount);
+  }
+
+  public void addInFlushBlockCount(long delta) {
+    long blockCount = inFlushBlockCount.addAndGet(delta);
+    ShuffleServerMetrics.gaugeTotalInFlushBlockCount.set(blockCount);
+  }
+
   public synchronized boolean requireMemory(long size, boolean isPreAllocated) 
{
     if (capacity - usedMemory.get() >= size) {
       usedMemory.addAndGet(size);
@@ -891,6 +915,7 @@ public class ShuffleBufferManager {
       Collection<ShuffleBuffer> buffers = 
bufferRangeMap.asMapOfRanges().values();
       if (buffers != null) {
         for (ShuffleBuffer buffer : buffers) {
+          addInMemoryBlockCount(-buffer.getBlockCount());
           // the actual released size by this thread
           long releasedSize = buffer.release();
           ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 6485eef55..156797bef 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -57,6 +57,8 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     }
     long currentEncodedLength = 0;
     long currentDataLength = 0;
+    int duplicateBlockCount = 0;
+    long duplicateBlockSize = 0;
 
     for (ShufflePartitionedBlock block : data.getBlockList()) {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
@@ -65,11 +67,15 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
         currentEncodedLength += block.getEncodedLength();
         currentDataLength += block.getDataLength();
       } else {
+        duplicateBlockCount++;
+        duplicateBlockSize += block.getEncodedLength();
         releaseBlock(block);
       }
     }
     this.encodedLength += currentEncodedLength;
     this.dataLength += currentDataLength;
+    data.setDuplicateBlockCount(duplicateBlockCount);
+    data.setDuplicateBlockSize(duplicateBlockSize);
 
     return currentEncodedLength;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 9c65eeee0..c442c9af6 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -64,6 +64,8 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     }
     long currentEncodedLength = 0;
     long currentDataLength = 0;
+    int duplicateBlockCount = 0;
+    long duplicateBlockSize = 0;
 
     for (ShufflePartitionedBlock block : data.getBlockList()) {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
@@ -74,11 +76,15 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
         currentEncodedLength += block.getEncodedLength();
         currentDataLength += block.getDataLength();
       } else {
+        duplicateBlockCount++;
+        duplicateBlockSize += block.getEncodedLength();
         releaseBlock(block);
       }
     }
     this.encodedLength += currentEncodedLength;
     this.dataLength += currentDataLength;
+    data.setDuplicateBlockCount(duplicateBlockCount);
+    data.setDuplicateBlockSize(duplicateBlockSize);
 
     return currentEncodedLength;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java 
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index e4000bc39..01ba26c8a 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -330,7 +330,7 @@ public class Partition<K, V> {
       shuffle
           .shuffleServer
           .getShuffleTaskManager()
-          .updateCachedBlockIds(appId, shuffle.shuffleId, 
spd.getPartitionId(), spd.getBlockList());
+          .updateCachedBlockIds(appId, shuffle.shuffleId, 
spd.getPartitionId(), spd);
       sleepTime = initSleepTime;
       return true;
     } else {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 0b9b5f83c..d6489deb6 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -317,8 +317,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
               shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
               alreadyReleasedSize += toReleasedSize;
-              shuffleTaskManager.updateCachedBlockIds(
-                  appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
+              shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
spd.getPartitionId(), spd);
             }
           } catch (ExceedHugePartitionHardLimitException e) {
             String errorMsg =
diff --git 
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
index c21fb0db4..0a5508d8f 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
@@ -134,9 +134,9 @@ public class KerberizedShuffleTaskManagerTest extends 
KerberizedHadoopBase {
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
     shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0);
     shuffleTaskManager.refreshAppId(appId);
     shuffleTaskManager.checkResourceStatus();
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index b9cdf1e24..3356cca5a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -156,7 +156,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
       ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 
35);
       shuffleTaskManager.requireBuffer(35);
       shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
-      shuffleTaskManager.updateCachedBlockIds(appId, i, 
partitionedData0.getBlockList());
+      shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0);
     }
 
     assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -220,7 +220,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     // case3
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0);
     try {
       long requiredId =
           shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
@@ -232,7 +232,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     // case4
     partitionedData0 = createPartitionedData(1, 1, 500);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0);
     try {
       shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       fail("Should throw NoBufferForHugePartitionException");
@@ -255,7 +255,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     shuffleServer.getShuffleBufferManager().setBufferFlushThreshold(1024);
     partitionedData0 = createPartitionedData(1, 1, 500);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0);
     try {
       shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
       fail("Should throw NoBufferForHugePartitionException");
@@ -298,14 +298,14 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     // case1
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
     long size1 = partitionedData0.getTotalBlockEncodedLength();
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0);
 
     assertEquals(size1, 
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
 
     // case2
     partitionedData0 = createPartitionedData(1, 1, 35);
     long size2 = partitionedData0.getTotalBlockEncodedLength();
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0);
     assertEquals(size1 + size2, 
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
     assertEquals(
         size1 + size2, 
shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1));
@@ -412,7 +412,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
     assertEquals(35, pabi.getRequireSize());
     StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, 
true, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData0);
     // the required id won't be removed in shuffleTaskManager, it is removed 
in Grpc service
     assertEquals(1, bufferIds.size());
     assertEquals(StatusCode.SUCCESS, sc);
@@ -429,7 +429,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
     bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData1);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData1.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData1);
     assertEquals(StatusCode.SUCCESS, sc);
     shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
@@ -439,7 +439,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData2.getBlockList()));
     // receive un-preAllocation data
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData2);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData2.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData2);
     assertEquals(StatusCode.SUCCESS, sc);
 
     // won't flush for partition 2-2
@@ -447,7 +447,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
     bufferId = shuffleTaskManager.requireBuffer(30);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData3);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData3.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData3);
     shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     assertEquals(StatusCode.SUCCESS, sc);
 
@@ -456,7 +456,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
     bufferId = shuffleTaskManager.requireBuffer(35);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData4);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData4.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData4);
     shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     assertEquals(StatusCode.SUCCESS, sc);
 
@@ -466,7 +466,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
 
     // flush for partition 1-1
     ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData5.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData5);
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
     bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData5);
@@ -483,7 +483,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
 
     // flush for partition 0-1
     ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData7.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData7);
     bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData7);
     assertEquals(StatusCode.SUCCESS, sc);
@@ -544,9 +544,9 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
     shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0);
     shuffleTaskManager.refreshAppId(appId);
     shuffleTaskManager.checkResourceStatus();
 
@@ -596,7 +596,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
       ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 
35);
       shuffleTaskManager.requireBuffer(35);
       shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
-      shuffleTaskManager.updateCachedBlockIds(appId, i, 
partitionedData0.getBlockList());
+      shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0);
     }
 
     assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -666,8 +666,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
       Thread.sleep(1000);
       ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 
35);
       shuffleTaskManager.cacheShuffleData("clearTest1", shuffleId, false, 
partitionedData0);
-      shuffleTaskManager.updateCachedBlockIds(
-          "clearTest1", shuffleId, partitionedData0.getBlockList());
+      shuffleTaskManager.updateCachedBlockIds("clearTest1", shuffleId, 
partitionedData0);
       shuffleTaskManager.refreshAppId("clearTest1");
       shuffleTaskManager.checkResourceStatus();
       retry++;
@@ -1039,7 +1038,7 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
       Thread.sleep(1000);
       ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
       shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
shuffleData);
-      shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
shuffleData.getBlockList());
+      shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData);
       shuffleTaskManager.refreshAppId(appId);
       shuffleTaskManager.checkResourceStatus();
 
@@ -1215,7 +1214,7 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
     shuffleTaskManager.refreshAppId(appId);
     shuffleTaskManager.checkResourceStatus();
     assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -1260,7 +1259,7 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
     shuffleTaskManager.requireBuffer(35);
     shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
-    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
     shuffleTaskManager.refreshAppId(appId);
     shuffleTaskManager.checkResourceStatus();
     assertEquals(1, shuffleTaskManager.getAppIds().size());
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 7dbba94b2..f2991b226 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -517,7 +517,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
     ShufflePartitionedData partitionedData = createData(0, 1);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData);
     assertEquals(1 + 32, shuffleBufferManager.getUsedMemory());
     long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
0);
     assertEquals(1 + 32, usedSize);
@@ -532,7 +532,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // case2: its partition is huge partition, its buffer will be flushed to 
DISK directly
     partitionedData = createData(0, 36);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData);
     assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
     assertTrue(
         HugePartitionUtils.limitHugePartition(
@@ -543,7 +543,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
             shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
     partitionedData = createData(0, 1);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData);
     waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
   }
 
@@ -845,7 +845,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
     ShufflePartitionedData partitionedData = createData(0, 1);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData);
     long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
0);
     assertEquals(1 + 32, usedSize);
     assertFalse(
@@ -855,7 +855,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // case2: its partition exceed the split limit
     partitionedData = createData(0, 200);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
-    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData);
     usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0);
     assertEquals(1 + 32 + 200 + 32, usedSize);
     assertTrue(


Reply via email to