This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 116125c0 [ISSUE-378][HugePartition][Part-3] Introduce more metrics 
about huge partition (#494)
116125c0 is described below

commit 116125c0e4b8f4d8da46a9d84a5f273681204134
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Jan 22 13:21:38 2023 +0800

    [ISSUE-378][HugePartition][Part-3] Introduce more metrics about huge 
partition (#494)
    
    ### What changes were proposed in this pull request?
    Introduce more metrics about huge partition
    __counter__
    1. total_require_buffer_failed_for_huge_partition
    2. total_require_buffer_failed_for_regular_partition
    3. total_app_num
    4. total_app_with_huge_partition_num
    5. total_partition_num
    6. total_huge_partition_num
    
    __Gauge__
    1. huge_partition_num
    2. app_with_huge_partition_num
    
    ### Why are the changes needed?
    Having these metrics, we should observe the concrete influence from huge 
partition for regular partition and huge partition number in one shuffle-server
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    1. UTs
---
 .../uniffle/server/ShuffleServerMetrics.java       | 35 +++++++++
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 45 ++++++++++-
 .../apache/uniffle/server/ShuffleTaskManager.java  | 39 +++++++---
 .../server/buffer/ShuffleBufferManager.java        |  5 ++
 .../apache/uniffle/server/ShuffleTaskInfoTest.java | 90 +++++++++++++++++++++-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 12 +++
 6 files changed, 212 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 1bf42a92..1d4ddf50 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -71,6 +71,10 @@ public class ShuffleServerMetrics {
   private static final String TOTAL_HDFS_WRITE_DATA = "total_hdfs_write_data";
   private static final String TOTAL_LOCALFILE_WRITE_DATA = 
"total_localfile_write_data";
   private static final String TOTAL_REQUIRE_BUFFER_FAILED = 
"total_require_buffer_failed";
+  private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION =
+      "total_require_buffer_failed_for_huge_partition";
+  private static final String 
TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION =
+      "total_require_buffer_failed_for_regular_partition";
   private static final String STORAGE_TOTAL_WRITE_LOCAL = 
"storage_total_write_local";
   private static final String STORAGE_RETRY_WRITE_LOCAL = 
"storage_retry_write_local";
   private static final String STORAGE_FAILED_WRITE_LOCAL = 
"storage_failed_write_local";
@@ -80,6 +84,19 @@ public class ShuffleServerMetrics {
   public static final String STORAGE_FAILED_WRITE_REMOTE_PREFIX = 
"storage_failed_write_remote_";
   public static final String STORAGE_SUCCESS_WRITE_REMOTE_PREFIX = 
"storage_success_write_remote_";
 
+  private static final String TOTAL_APP_NUM = "total_app_num";
+  private static final String TOTAL_APP_WITH_HUGE_PARTITION_NUM = 
"total_app_with_huge_partition_num";
+  private static final String TOTAL_PARTITION_NUM = "total_partition_num";
+  private static final String TOTAL_HUGE_PARTITION_NUM = 
"total_huge_partition_num";
+
+  private static final String HUGE_PARTITION_NUM = "huge_partition_num";
+  private static final String APP_WITH_HUGE_PARTITION_NUM = 
"app_with_huge_partition_num";
+
+  public static Counter counterTotalAppNum;
+  public static Counter counterTotalAppWithHugePartitionNum;
+  public static Counter counterTotalPartitionNum;
+  public static Counter counterTotalHugePartitionNum;
+
   public static Counter counterTotalReceivedDataSize;
   public static Counter counterTotalWriteDataSize;
   public static Counter counterTotalWriteBlockSize;
@@ -101,6 +118,9 @@ public class ShuffleServerMetrics {
   public static Counter counterTotalHdfsWriteDataSize;
   public static Counter counterTotalLocalFileWriteDataSize;
   public static Counter counterTotalRequireBufferFailed;
+  public static Counter counterTotalRequireBufferFailedForHugePartition;
+  public static Counter counterTotalRequireBufferFailedForRegularPartition;
+
   public static Counter counterLocalStorageTotalWrite;
   public static Counter counterLocalStorageRetryWrite;
   public static Counter counterLocalStorageFailedWrite;
@@ -109,6 +129,9 @@ public class ShuffleServerMetrics {
   public static Counter counterTotalRequireReadMemoryRetryNum;
   public static Counter counterTotalRequireReadMemoryFailedNum;
 
+  public static Gauge gaugeHugePartitionNum;
+  public static Gauge gaugeAppWithHugePartitionNum;
+
   public static Gauge gaugeLocalStorageTotalDirsNum;
   public static Gauge gaugeLocalStorageCorruptedDirsNum;
   public static Gauge gaugeLocalStorageTotalSpace;
@@ -245,6 +268,10 @@ public class ShuffleServerMetrics {
     counterTotalHdfsWriteDataSize = 
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
     counterTotalLocalFileWriteDataSize = 
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
     counterTotalRequireBufferFailed = 
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
+    counterTotalRequireBufferFailedForRegularPartition =
+        
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
+    counterTotalRequireBufferFailedForHugePartition =
+        
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
     counterLocalStorageTotalWrite = 
metricsManager.addCounter(STORAGE_TOTAL_WRITE_LOCAL);
     counterLocalStorageRetryWrite = 
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
     counterLocalStorageFailedWrite = 
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
@@ -253,6 +280,11 @@ public class ShuffleServerMetrics {
     counterTotalRequireReadMemoryRetryNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
     counterTotalRequireReadMemoryFailedNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
 
+    counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
+    counterTotalAppWithHugePartitionNum = 
metricsManager.addCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
+    counterTotalPartitionNum = metricsManager.addCounter(TOTAL_PARTITION_NUM);
+    counterTotalHugePartitionNum = 
metricsManager.addCounter(TOTAL_HUGE_PARTITION_NUM);
+
     gaugeLocalStorageTotalDirsNum = 
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
     gaugeLocalStorageCorruptedDirsNum = 
metricsManager.addGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
     gaugeLocalStorageTotalSpace = 
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_SPACE);
@@ -267,6 +299,9 @@ public class ShuffleServerMetrics {
     gaugeEventQueueSize = metricsManager.addGauge(EVENT_QUEUE_SIZE);
     gaugeAppNum = metricsManager.addGauge(APP_NUM_WITH_NODE);
     gaugeTotalPartitionNum = metricsManager.addGauge(PARTITION_NUM_WITH_NODE);
+
+    gaugeHugePartitionNum = metricsManager.addGauge(HUGE_PARTITION_NUM);
+    gaugeAppWithHugePartitionNum = 
metricsManager.addGauge(APP_WITH_HUGE_PARTITION_NUM);
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 46becf95..b5950377 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -18,12 +18,17 @@
 package org.apache.uniffle.server;
 
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 
@@ -32,7 +37,9 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
  * the information of the cache block, user and timestamp corresponding to the 
app
  */
 public class ShuffleTaskInfo {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ShuffleTaskInfo.class);
 
+  private final String appId;
   private Long currentTimes;
   /**
    * shuffleId -> commit count
@@ -52,8 +59,14 @@ public class ShuffleTaskInfo {
    * shuffleId -> partitionId -> partition shuffle data size
    */
   private Map<Integer, Map<Integer, Long>> partitionDataSizes;
+  /**
+   * shuffleId -> huge partitionIds set
+   */
+  private final Map<Integer, Set> hugePartitionTags;
+  private final AtomicBoolean existHugePartition;
 
-  public ShuffleTaskInfo() {
+  public ShuffleTaskInfo(String appId) {
+    this.appId = appId;
     this.currentTimes = System.currentTimeMillis();
     this.commitCounts = Maps.newConcurrentMap();
     this.commitLocks = Maps.newConcurrentMap();
@@ -61,6 +74,8 @@ public class ShuffleTaskInfo {
     this.user = new AtomicReference<>();
     this.dataDistType = new AtomicReference<>();
     this.partitionDataSizes = Maps.newConcurrentMap();
+    this.hugePartitionTags = Maps.newConcurrentMap();
+    this.existHugePartition = new AtomicBoolean(false);
   }
 
   public Long getCurrentTimes() {
@@ -100,12 +115,12 @@ public class ShuffleTaskInfo {
     return dataDistType.get();
   }
 
-  public void addPartitionDataSize(int shuffleId, int partitionId, long delta) 
{
+  public long addPartitionDataSize(int shuffleId, int partitionId, long delta) 
{
     totalDataSize.addAndGet(delta);
     partitionDataSizes.computeIfAbsent(shuffleId, key -> 
Maps.newConcurrentMap());
     Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
     partitions.putIfAbsent(partitionId, 0L);
-    partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
+    return partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
   }
 
   public long getTotalDataSize() {
@@ -124,4 +139,28 @@ public class ShuffleTaskInfo {
     return size;
   }
 
+  public boolean hasHugePartition() {
+    return existHugePartition.get();
+  }
+
+  public int getHugePartitionSize() {
+    return hugePartitionTags.values().stream().map(x -> x.size()).reduce((x, 
y) -> x + y).orElse(0);
+  }
+
+  public void markHugePartition(int shuffleId, int partitionId) {
+    if (!existHugePartition.get()) {
+      boolean markedWithCAS = existHugePartition.compareAndSet(false, true);
+      if (markedWithCAS) {
+        ShuffleServerMetrics.gaugeAppWithHugePartitionNum.inc();
+        ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.inc();
+      }
+    }
+
+    Set<Integer> partitions = hugePartitionTags.computeIfAbsent(shuffleId, key 
-> Sets.newConcurrentHashSet());
+    if (partitions.add(partitionId)) {
+      ShuffleServerMetrics.counterTotalHugePartitionNum.inc();
+      ShuffleServerMetrics.gaugeHugePartitionNum.inc();
+      LOGGER.warn("Huge partition occurs, appId: {}, shuffleId: {}, 
partitionId: {}", appId, shuffleId, partitionId);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 53f22c46..213c66dc 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -229,7 +229,7 @@ public class ShuffleTaskManager {
     refreshAppId(appId);
     Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
     Roaring64NavigableMap cloneBlockIds;
-    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo(appId));
     Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, 
x -> new Object());
     synchronized (lock) {
       long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT);
@@ -296,7 +296,7 @@ public class ShuffleTaskManager {
   }
 
   public int updateAndGetCommitCount(String appId, int shuffleId) {
-    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo(appId));
     AtomicInteger commitNum = shuffleTaskInfo.getCommitCounts()
         .computeIfAbsent(shuffleId, x -> new AtomicInteger(0));
     return commitNum.incrementAndGet();
@@ -311,7 +311,7 @@ public class ShuffleTaskManager {
     if (spbs == null || spbs.length == 0) {
       return;
     }
-    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo(appId));
     Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
         .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
 
@@ -322,16 +322,22 @@ public class ShuffleTaskManager {
         size += spb.getSize();
       }
     }
-    shuffleTaskInfo.addPartitionDataSize(
+    long partitionSize = shuffleTaskInfo.addPartitionDataSize(
         shuffleId,
         partitionId,
         size
     );
+    if (shuffleBufferManager.isHugePartition(partitionSize)) {
+      shuffleTaskInfo.markHugePartition(
+          shuffleId,
+          partitionId
+      );
+    }
   }
 
   public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
     Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = shuffleTaskInfos
-        .getOrDefault(appId, new ShuffleTaskInfo()).getCachedBlockIds();
+        .getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds();
     Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
     if (blockIds == null) {
       LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId + 
"], shuffleId[" + shuffleId + "]");
@@ -354,6 +360,7 @@ public class ShuffleTaskManager {
       for (int partitionId : partitionIds) {
         long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
         if (shuffleBufferManager.limitHugePartition(appId, shuffleId, 
partitionId, partitionUsedDataSize)) {
+          
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
           return -1;
         }
       }
@@ -368,6 +375,9 @@ public class ShuffleTaskManager {
       requireBufferIds.put(requireId,
           new PreAllocatedBufferInfo(requireId, System.currentTimeMillis(), 
requireSize));
     }
+    if (requireId == -1) {
+      
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.inc();
+    }
     return requireId;
   }
 
@@ -560,12 +570,12 @@ public class ShuffleTaskManager {
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
     final long start = System.currentTimeMillis();
-    ShuffleTaskInfo shffleTaskInfo = shuffleTaskInfos.remove(appId);
-    if (shffleTaskInfo == null) {
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
+    if (shuffleTaskInfo == null) {
       LOG.info("Resource for appId[" + appId + "] had been removed before.");
       return;
     }
-    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = 
shffleTaskInfo.getCachedBlockIds();
+    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = 
shuffleTaskInfo.getCachedBlockIds();
     partitionsToBlockIds.remove(appId);
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
@@ -574,11 +584,20 @@ public class ShuffleTaskManager {
           new AppPurgeEvent(appId, getUserByAppId(appId), new 
ArrayList<>(shuffleToCachedBlockIds.keySet()))
       );
     }
+    if (shuffleTaskInfo.hasHugePartition()) {
+      ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
+      
ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize());
+    }
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + 
(System.currentTimeMillis() - start) + " ms");
   }
 
   public void refreshAppId(String appId) {
-    shuffleTaskInfos.computeIfAbsent(appId, x -> new 
ShuffleTaskInfo()).setCurrentTimes(System.currentTimeMillis());
+    shuffleTaskInfos.computeIfAbsent(
+        appId,
+        x -> {
+          ShuffleServerMetrics.counterTotalAppNum.inc();
+          return new ShuffleTaskInfo(appId);
+        }).setCurrentTimes(System.currentTimeMillis());
   }
 
   // check pre allocated buffer, release the memory if it expired
@@ -615,7 +634,7 @@ public class ShuffleTaskManager {
   }
 
   public String getUserByAppId(String appId) {
-    return shuffleTaskInfos.computeIfAbsent(appId, x -> new 
ShuffleTaskInfo()).getUser();
+    return shuffleTaskInfos.computeIfAbsent(appId, x -> new 
ShuffleTaskInfo(appId)).getUser();
   }
 
   @VisibleForTesting
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index b4c7c2d4..20998dab 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -102,6 +102,7 @@ public class ShuffleBufferManager {
     shuffleIdToBuffers.putIfAbsent(shuffleId, TreeRangeMap.create());
     RangeMap<Integer, ShuffleBuffer> bufferRangeMap = 
shuffleIdToBuffers.get(shuffleId);
     if (bufferRangeMap.get(startPartition) == null) {
+      ShuffleServerMetrics.counterTotalPartitionNum.inc();
       ShuffleServerMetrics.gaugeTotalPartitionNum.inc();
       bufferRangeMap.put(Range.closed(startPartition, endPartition), new 
ShuffleBuffer(bufferSize));
     } else {
@@ -564,6 +565,10 @@ public class ShuffleBufferManager {
     }
   }
 
+  public boolean isHugePartition(long usedPartitionDataSize) {
+    return usedPartitionDataSize > hugePartitionSizeThreshold;
+  }
+
   public boolean limitHugePartition(String appId, int shuffleId, int 
partitionId, long usedPartitionDataSize) {
     if (usedPartitionDataSize > hugePartitionSizeThreshold) {
       long memoryUsed = getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue().getSize();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
index 4c407a6e..b08a6c6c 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
@@ -17,15 +17,103 @@
 
 package org.apache.uniffle.server;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShuffleTaskInfoTest {
 
+  @BeforeEach
+  public void setup() {
+    ShuffleServerMetrics.register();
+  }
+
+  @AfterEach
+  public void tearDown() {
+    ShuffleServerMetrics.clear();
+  }
+
+  @Test
+  public void hugePartitionConcurrentTest() throws InterruptedException {
+    ShuffleTaskInfo shuffleTaskInfo = new 
ShuffleTaskInfo("hugePartitionConcurrentTest_appId");
+
+    // case1
+    int n = 10;
+    final CyclicBarrier barrier = new CyclicBarrier(n);
+    final CountDownLatch countDownLatch = new CountDownLatch(n);
+    ExecutorService executorService = Executors.newFixedThreadPool(n);
+    IntStream.range(0, n).forEach(i -> executorService.submit(() -> {
+      try {
+        barrier.await();
+        shuffleTaskInfo.markHugePartition(i, i);
+      } catch (Exception e) {
+        // ignore
+      } finally {
+        countDownLatch.countDown();
+      }
+    }));
+    countDownLatch.await();
+    assertEquals(1, 
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+    assertEquals(n, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+    assertEquals(n, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+
+    // case2
+    ShuffleServerMetrics.clear();
+    ShuffleServerMetrics.register();
+    barrier.reset();
+    CountDownLatch latch = new CountDownLatch(n);
+    ShuffleTaskInfo taskInfo = new 
ShuffleTaskInfo("hugePartitionConcurrentTest_appId");
+    IntStream.range(0, n).forEach(i -> executorService.submit(() -> {
+      try {
+        barrier.await();
+        taskInfo.markHugePartition(1, 1);
+      } catch (Exception e) {
+        // ignore
+      } finally {
+        latch.countDown();
+      }
+    }));
+    latch.await();
+    assertEquals(1, 
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+
+    executorService.shutdownNow();
+  }
+
+  @Test
+  public void hugePartitionTest() {
+    ShuffleTaskInfo shuffleTaskInfo = new 
ShuffleTaskInfo("hugePartition_appId");
+
+    // case1
+    assertFalse(shuffleTaskInfo.hasHugePartition());
+    assertEquals(0, shuffleTaskInfo.getHugePartitionSize());
+
+    // case2
+    shuffleTaskInfo.markHugePartition(1, 1);
+    shuffleTaskInfo.markHugePartition(1, 2);
+    shuffleTaskInfo.markHugePartition(2, 2);
+    assertTrue(shuffleTaskInfo.hasHugePartition());
+    assertEquals(3, shuffleTaskInfo.getHugePartitionSize());
+    assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+    assertEquals(1, 
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+  }
+
   @Test
   public void partitionSizeSummaryTest() {
-    ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+    ShuffleTaskInfo shuffleTaskInfo = new 
ShuffleTaskInfo("partitionSizeSummaryTest_appId");
     // case1
     long size = shuffleTaskInfo.getPartitionDataSize(0, 0);
     assertEquals(0, size);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 43cd5ac1..9c2af94d 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -119,6 +119,18 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
     requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
500);
     assertEquals(-1, requiredId);
+    // metrics test
+    assertEquals(1, 
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
+    assertEquals(0, 
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());
+    assertEquals(1, 
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+    assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+
+    // case4
+    shuffleTaskManager.removeResources(appId);
+    assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+    assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
   }
 
   @Test

Reply via email to