This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 116125c0 [ISSUE-378][HugePartition][Part-3] Introduce more metrics
about huge partition (#494)
116125c0 is described below
commit 116125c0e4b8f4d8da46a9d84a5f273681204134
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Jan 22 13:21:38 2023 +0800
[ISSUE-378][HugePartition][Part-3] Introduce more metrics about huge
partition (#494)
### What changes were proposed in this pull request?
Introduce more metrics about huge partition
__counter__
1. total_require_buffer_failed_for_huge_partition
2. total_require_buffer_failed_for_regular_partition
3. total_app_num
4. total_app_with_huge_partition_num
5. total_partition_num
6. total_huge_partition_num
__Gauge__
1. huge_partition_num
2. app_with_huge_partition_num
### Why are the changes needed?
Having these metrics, we should observe the concrete influence from huge
partition for regular partition and huge partition number in one shuffle-server
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../uniffle/server/ShuffleServerMetrics.java | 35 +++++++++
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 45 ++++++++++-
.../apache/uniffle/server/ShuffleTaskManager.java | 39 +++++++---
.../server/buffer/ShuffleBufferManager.java | 5 ++
.../apache/uniffle/server/ShuffleTaskInfoTest.java | 90 +++++++++++++++++++++-
.../uniffle/server/ShuffleTaskManagerTest.java | 12 +++
6 files changed, 212 insertions(+), 14 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 1bf42a92..1d4ddf50 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -71,6 +71,10 @@ public class ShuffleServerMetrics {
private static final String TOTAL_HDFS_WRITE_DATA = "total_hdfs_write_data";
private static final String TOTAL_LOCALFILE_WRITE_DATA =
"total_localfile_write_data";
private static final String TOTAL_REQUIRE_BUFFER_FAILED =
"total_require_buffer_failed";
+ private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION =
+ "total_require_buffer_failed_for_huge_partition";
+ private static final String
TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION =
+ "total_require_buffer_failed_for_regular_partition";
private static final String STORAGE_TOTAL_WRITE_LOCAL =
"storage_total_write_local";
private static final String STORAGE_RETRY_WRITE_LOCAL =
"storage_retry_write_local";
private static final String STORAGE_FAILED_WRITE_LOCAL =
"storage_failed_write_local";
@@ -80,6 +84,19 @@ public class ShuffleServerMetrics {
public static final String STORAGE_FAILED_WRITE_REMOTE_PREFIX =
"storage_failed_write_remote_";
public static final String STORAGE_SUCCESS_WRITE_REMOTE_PREFIX =
"storage_success_write_remote_";
+ private static final String TOTAL_APP_NUM = "total_app_num";
+ private static final String TOTAL_APP_WITH_HUGE_PARTITION_NUM =
"total_app_with_huge_partition_num";
+ private static final String TOTAL_PARTITION_NUM = "total_partition_num";
+ private static final String TOTAL_HUGE_PARTITION_NUM =
"total_huge_partition_num";
+
+ private static final String HUGE_PARTITION_NUM = "huge_partition_num";
+ private static final String APP_WITH_HUGE_PARTITION_NUM =
"app_with_huge_partition_num";
+
+ public static Counter counterTotalAppNum;
+ public static Counter counterTotalAppWithHugePartitionNum;
+ public static Counter counterTotalPartitionNum;
+ public static Counter counterTotalHugePartitionNum;
+
public static Counter counterTotalReceivedDataSize;
public static Counter counterTotalWriteDataSize;
public static Counter counterTotalWriteBlockSize;
@@ -101,6 +118,9 @@ public class ShuffleServerMetrics {
public static Counter counterTotalHdfsWriteDataSize;
public static Counter counterTotalLocalFileWriteDataSize;
public static Counter counterTotalRequireBufferFailed;
+ public static Counter counterTotalRequireBufferFailedForHugePartition;
+ public static Counter counterTotalRequireBufferFailedForRegularPartition;
+
public static Counter counterLocalStorageTotalWrite;
public static Counter counterLocalStorageRetryWrite;
public static Counter counterLocalStorageFailedWrite;
@@ -109,6 +129,9 @@ public class ShuffleServerMetrics {
public static Counter counterTotalRequireReadMemoryRetryNum;
public static Counter counterTotalRequireReadMemoryFailedNum;
+ public static Gauge gaugeHugePartitionNum;
+ public static Gauge gaugeAppWithHugePartitionNum;
+
public static Gauge gaugeLocalStorageTotalDirsNum;
public static Gauge gaugeLocalStorageCorruptedDirsNum;
public static Gauge gaugeLocalStorageTotalSpace;
@@ -245,6 +268,10 @@ public class ShuffleServerMetrics {
counterTotalHdfsWriteDataSize =
metricsManager.addCounter(TOTAL_HDFS_WRITE_DATA);
counterTotalLocalFileWriteDataSize =
metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA);
counterTotalRequireBufferFailed =
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED);
+ counterTotalRequireBufferFailedForRegularPartition =
+
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION);
+ counterTotalRequireBufferFailedForHugePartition =
+
metricsManager.addCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION);
counterLocalStorageTotalWrite =
metricsManager.addCounter(STORAGE_TOTAL_WRITE_LOCAL);
counterLocalStorageRetryWrite =
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
counterLocalStorageFailedWrite =
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
@@ -253,6 +280,11 @@ public class ShuffleServerMetrics {
counterTotalRequireReadMemoryRetryNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
counterTotalRequireReadMemoryFailedNum =
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
+ counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
+ counterTotalAppWithHugePartitionNum =
metricsManager.addCounter(TOTAL_APP_WITH_HUGE_PARTITION_NUM);
+ counterTotalPartitionNum = metricsManager.addCounter(TOTAL_PARTITION_NUM);
+ counterTotalHugePartitionNum =
metricsManager.addCounter(TOTAL_HUGE_PARTITION_NUM);
+
gaugeLocalStorageTotalDirsNum =
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_DIRS_NUM);
gaugeLocalStorageCorruptedDirsNum =
metricsManager.addGauge(LOCAL_STORAGE_CORRUPTED_DIRS_NUM);
gaugeLocalStorageTotalSpace =
metricsManager.addGauge(LOCAL_STORAGE_TOTAL_SPACE);
@@ -267,6 +299,9 @@ public class ShuffleServerMetrics {
gaugeEventQueueSize = metricsManager.addGauge(EVENT_QUEUE_SIZE);
gaugeAppNum = metricsManager.addGauge(APP_NUM_WITH_NODE);
gaugeTotalPartitionNum = metricsManager.addGauge(PARTITION_NUM_WITH_NODE);
+
+ gaugeHugePartitionNum = metricsManager.addGauge(HUGE_PARTITION_NUM);
+ gaugeAppWithHugePartitionNum =
metricsManager.addGauge(APP_WITH_HUGE_PARTITION_NUM);
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 46becf95..b5950377 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -18,12 +18,17 @@
package org.apache.uniffle.server;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
@@ -32,7 +37,9 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
* the information of the cache block, user and timestamp corresponding to the
app
*/
public class ShuffleTaskInfo {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ShuffleTaskInfo.class);
+ private final String appId;
private Long currentTimes;
/**
* shuffleId -> commit count
@@ -52,8 +59,14 @@ public class ShuffleTaskInfo {
* shuffleId -> partitionId -> partition shuffle data size
*/
private Map<Integer, Map<Integer, Long>> partitionDataSizes;
+ /**
+ * shuffleId -> huge partitionIds set
+ */
+ private final Map<Integer, Set> hugePartitionTags;
+ private final AtomicBoolean existHugePartition;
- public ShuffleTaskInfo() {
+ public ShuffleTaskInfo(String appId) {
+ this.appId = appId;
this.currentTimes = System.currentTimeMillis();
this.commitCounts = Maps.newConcurrentMap();
this.commitLocks = Maps.newConcurrentMap();
@@ -61,6 +74,8 @@ public class ShuffleTaskInfo {
this.user = new AtomicReference<>();
this.dataDistType = new AtomicReference<>();
this.partitionDataSizes = Maps.newConcurrentMap();
+ this.hugePartitionTags = Maps.newConcurrentMap();
+ this.existHugePartition = new AtomicBoolean(false);
}
public Long getCurrentTimes() {
@@ -100,12 +115,12 @@ public class ShuffleTaskInfo {
return dataDistType.get();
}
- public void addPartitionDataSize(int shuffleId, int partitionId, long delta)
{
+ public long addPartitionDataSize(int shuffleId, int partitionId, long delta)
{
totalDataSize.addAndGet(delta);
partitionDataSizes.computeIfAbsent(shuffleId, key ->
Maps.newConcurrentMap());
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
partitions.putIfAbsent(partitionId, 0L);
- partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
+ return partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
}
public long getTotalDataSize() {
@@ -124,4 +139,28 @@ public class ShuffleTaskInfo {
return size;
}
+ public boolean hasHugePartition() {
+ return existHugePartition.get();
+ }
+
+ public int getHugePartitionSize() {
+ return hugePartitionTags.values().stream().map(x -> x.size()).reduce((x,
y) -> x + y).orElse(0);
+ }
+
+ public void markHugePartition(int shuffleId, int partitionId) {
+ if (!existHugePartition.get()) {
+ boolean markedWithCAS = existHugePartition.compareAndSet(false, true);
+ if (markedWithCAS) {
+ ShuffleServerMetrics.gaugeAppWithHugePartitionNum.inc();
+ ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.inc();
+ }
+ }
+
+ Set<Integer> partitions = hugePartitionTags.computeIfAbsent(shuffleId, key
-> Sets.newConcurrentHashSet());
+ if (partitions.add(partitionId)) {
+ ShuffleServerMetrics.counterTotalHugePartitionNum.inc();
+ ShuffleServerMetrics.gaugeHugePartitionNum.inc();
+ LOGGER.warn("Huge partition occurs, appId: {}, shuffleId: {},
partitionId: {}", appId, shuffleId, partitionId);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 53f22c46..213c66dc 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -229,7 +229,7 @@ public class ShuffleTaskManager {
refreshAppId(appId);
Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
Roaring64NavigableMap cloneBlockIds;
- ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo(appId));
Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId,
x -> new Object());
synchronized (lock) {
long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT);
@@ -296,7 +296,7 @@ public class ShuffleTaskManager {
}
public int updateAndGetCommitCount(String appId, int shuffleId) {
- ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo(appId));
AtomicInteger commitNum = shuffleTaskInfo.getCommitCounts()
.computeIfAbsent(shuffleId, x -> new AtomicInteger(0));
return commitNum.incrementAndGet();
@@ -311,7 +311,7 @@ public class ShuffleTaskManager {
if (spbs == null || spbs.length == 0) {
return;
}
- ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo(appId));
Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
.computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
@@ -322,16 +322,22 @@ public class ShuffleTaskManager {
size += spb.getSize();
}
}
- shuffleTaskInfo.addPartitionDataSize(
+ long partitionSize = shuffleTaskInfo.addPartitionDataSize(
shuffleId,
partitionId,
size
);
+ if (shuffleBufferManager.isHugePartition(partitionSize)) {
+ shuffleTaskInfo.markHugePartition(
+ shuffleId,
+ partitionId
+ );
+ }
}
public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = shuffleTaskInfos
- .getOrDefault(appId, new ShuffleTaskInfo()).getCachedBlockIds();
+ .getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds();
Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId +
"], shuffleId[" + shuffleId + "]");
@@ -354,6 +360,7 @@ public class ShuffleTaskManager {
for (int partitionId : partitionIds) {
long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId,
partitionId);
if (shuffleBufferManager.limitHugePartition(appId, shuffleId,
partitionId, partitionUsedDataSize)) {
+
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
return -1;
}
}
@@ -368,6 +375,9 @@ public class ShuffleTaskManager {
requireBufferIds.put(requireId,
new PreAllocatedBufferInfo(requireId, System.currentTimeMillis(),
requireSize));
}
+ if (requireId == -1) {
+
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.inc();
+ }
return requireId;
}
@@ -560,12 +570,12 @@ public class ShuffleTaskManager {
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
final long start = System.currentTimeMillis();
- ShuffleTaskInfo shffleTaskInfo = shuffleTaskInfos.remove(appId);
- if (shffleTaskInfo == null) {
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
+ if (shuffleTaskInfo == null) {
LOG.info("Resource for appId[" + appId + "] had been removed before.");
return;
}
- final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds =
shffleTaskInfo.getCachedBlockIds();
+ final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds =
shuffleTaskInfo.getCachedBlockIds();
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
@@ -574,11 +584,20 @@ public class ShuffleTaskManager {
new AppPurgeEvent(appId, getUserByAppId(appId), new
ArrayList<>(shuffleToCachedBlockIds.keySet()))
);
}
+ if (shuffleTaskInfo.hasHugePartition()) {
+ ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
+
ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize());
+ }
LOG.info("Finish remove resource for appId[" + appId + "] cost " +
(System.currentTimeMillis() - start) + " ms");
}
public void refreshAppId(String appId) {
- shuffleTaskInfos.computeIfAbsent(appId, x -> new
ShuffleTaskInfo()).setCurrentTimes(System.currentTimeMillis());
+ shuffleTaskInfos.computeIfAbsent(
+ appId,
+ x -> {
+ ShuffleServerMetrics.counterTotalAppNum.inc();
+ return new ShuffleTaskInfo(appId);
+ }).setCurrentTimes(System.currentTimeMillis());
}
// check pre allocated buffer, release the memory if it expired
@@ -615,7 +634,7 @@ public class ShuffleTaskManager {
}
public String getUserByAppId(String appId) {
- return shuffleTaskInfos.computeIfAbsent(appId, x -> new
ShuffleTaskInfo()).getUser();
+ return shuffleTaskInfos.computeIfAbsent(appId, x -> new
ShuffleTaskInfo(appId)).getUser();
}
@VisibleForTesting
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index b4c7c2d4..20998dab 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -102,6 +102,7 @@ public class ShuffleBufferManager {
shuffleIdToBuffers.putIfAbsent(shuffleId, TreeRangeMap.create());
RangeMap<Integer, ShuffleBuffer> bufferRangeMap =
shuffleIdToBuffers.get(shuffleId);
if (bufferRangeMap.get(startPartition) == null) {
+ ShuffleServerMetrics.counterTotalPartitionNum.inc();
ShuffleServerMetrics.gaugeTotalPartitionNum.inc();
bufferRangeMap.put(Range.closed(startPartition, endPartition), new
ShuffleBuffer(bufferSize));
} else {
@@ -564,6 +565,10 @@ public class ShuffleBufferManager {
}
}
+ public boolean isHugePartition(long usedPartitionDataSize) {
+ return usedPartitionDataSize > hugePartitionSizeThreshold;
+ }
+
public boolean limitHugePartition(String appId, int shuffleId, int
partitionId, long usedPartitionDataSize) {
if (usedPartitionDataSize > hugePartitionSizeThreshold) {
long memoryUsed = getShuffleBufferEntry(appId, shuffleId,
partitionId).getValue().getSize();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
index 4c407a6e..b08a6c6c 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
@@ -17,15 +17,103 @@
package org.apache.uniffle.server;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleTaskInfoTest {
+ @BeforeEach
+ public void setup() {
+ ShuffleServerMetrics.register();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ ShuffleServerMetrics.clear();
+ }
+
+ @Test
+ public void hugePartitionConcurrentTest() throws InterruptedException {
+ ShuffleTaskInfo shuffleTaskInfo = new
ShuffleTaskInfo("hugePartitionConcurrentTest_appId");
+
+ // case1
+ int n = 10;
+ final CyclicBarrier barrier = new CyclicBarrier(n);
+ final CountDownLatch countDownLatch = new CountDownLatch(n);
+ ExecutorService executorService = Executors.newFixedThreadPool(n);
+ IntStream.range(0, n).forEach(i -> executorService.submit(() -> {
+ try {
+ barrier.await();
+ shuffleTaskInfo.markHugePartition(i, i);
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ countDownLatch.countDown();
+ }
+ }));
+ countDownLatch.await();
+ assertEquals(1,
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+ assertEquals(n, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+ assertEquals(n, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+
+ // case2
+ ShuffleServerMetrics.clear();
+ ShuffleServerMetrics.register();
+ barrier.reset();
+ CountDownLatch latch = new CountDownLatch(n);
+ ShuffleTaskInfo taskInfo = new
ShuffleTaskInfo("hugePartitionConcurrentTest_appId");
+ IntStream.range(0, n).forEach(i -> executorService.submit(() -> {
+ try {
+ barrier.await();
+ taskInfo.markHugePartition(1, 1);
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ latch.countDown();
+ }
+ }));
+ latch.await();
+ assertEquals(1,
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void hugePartitionTest() {
+ ShuffleTaskInfo shuffleTaskInfo = new
ShuffleTaskInfo("hugePartition_appId");
+
+ // case1
+ assertFalse(shuffleTaskInfo.hasHugePartition());
+ assertEquals(0, shuffleTaskInfo.getHugePartitionSize());
+
+ // case2
+ shuffleTaskInfo.markHugePartition(1, 1);
+ shuffleTaskInfo.markHugePartition(1, 2);
+ shuffleTaskInfo.markHugePartition(2, 2);
+ assertTrue(shuffleTaskInfo.hasHugePartition());
+ assertEquals(3, shuffleTaskInfo.getHugePartitionSize());
+ assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+ assertEquals(1,
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+ }
+
@Test
public void partitionSizeSummaryTest() {
- ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+ ShuffleTaskInfo shuffleTaskInfo = new
ShuffleTaskInfo("partitionSizeSummaryTest_appId");
// case1
long size = shuffleTaskInfo.getPartitionDataSize(0, 0);
assertEquals(0, size);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 43cd5ac1..9c2af94d 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -119,6 +119,18 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
500);
assertEquals(-1, requiredId);
+ // metrics test
+ assertEquals(1,
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
+ assertEquals(0,
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());
+ assertEquals(1,
ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+ assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
+
+ // case4
+ shuffleTaskManager.removeResources(appId);
+ assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
+ assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
}
@Test