advancedxy commented on code in PR #494:
URL: https://github.com/apache/incubator-uniffle/pull/494#discussion_r1071969469
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -124,4 +133,30 @@ public long getPartitionDataSize(int shuffleId, int
partitionId) {
return size;
}
+ public boolean hasHugePartition() {
+ return !(hugePartitionTags.size() == 0);
+ }
+
+ public int getHugePartitionSize() {
+ if (hugePartitionTags == null) {
Review Comment:
hugePartitionTags couldn't be null, right?
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -124,4 +133,30 @@ public long getPartitionDataSize(int shuffleId, int
partitionId) {
return size;
}
+ public boolean hasHugePartition() {
+ return !(hugePartitionTags.size() == 0);
+ }
+
+ public int getHugePartitionSize() {
+ if (hugePartitionTags == null) {
+ return 0;
+ }
+ return hugePartitionTags.values().stream().map(x -> x.size()).reduce((x,
y) -> x + y).orElse(0);
+ }
+
+ public void markHugePartition(int shuffleId, int partitionId) {
+ hugePartitionTags.computeIfAbsent(shuffleId, key -> {
+ ShuffleServerMetrics.gaugeAppWithHugePartitionNum.inc();
Review Comment:
If I understand the name correctly, `gaugeAppWithHugePartitionNum` should be
the app number with huge partitions.
However, this increase this gauge once a huge partition is occurred in one
shuffle. There might be multiple shuffles and each has huge partitions.
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -574,11 +584,20 @@ public void removeResources(String appId) {
new AppPurgeEvent(appId, getUserByAppId(appId), new
ArrayList<>(shuffleToCachedBlockIds.keySet()))
);
}
+ if (shuffleTaskInfo.hasHugePartition()) {
+ ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
+
ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize());
+ }
LOG.info("Finish remove resource for appId[" + appId + "] cost " +
(System.currentTimeMillis() - start) + " ms");
}
public void refreshAppId(String appId) {
- shuffleTaskInfos.computeIfAbsent(appId, x -> new
ShuffleTaskInfo()).setCurrentTimes(System.currentTimeMillis());
+ shuffleTaskInfos.computeIfAbsent(
+ appId,
+ x -> {
+ ShuffleServerMetrics.counterTotalAppNum.inc();
Review Comment:
This might be inaccurate. And we already has an
org.apache.uniffle.server.ShuffleServerMetrics#gaugeAppNum
in metrics?
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -124,4 +133,30 @@ public long getPartitionDataSize(int shuffleId, int
partitionId) {
return size;
}
+ public boolean hasHugePartition() {
+ return !(hugePartitionTags.size() == 0);
+ }
+
+ public int getHugePartitionSize() {
+ if (hugePartitionTags == null) {
Review Comment:
According to calling method, this should be `getHugePartitionNum`.
The size is ambiguous as it also could refer to the actual partition data
size.
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -124,4 +133,30 @@ public long getPartitionDataSize(int shuffleId, int
partitionId) {
return size;
}
+ public boolean hasHugePartition() {
+ return !(hugePartitionTags.size() == 0);
Review Comment:
I prefer `!hugePartitionTags.empty()`
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -52,15 +58,18 @@ public class ShuffleTaskInfo {
* shuffleId -> partitionId -> partition shuffle data size
*/
private Map<Integer, Map<Integer, Long>> partitionDataSizes;
+ private Map<Integer, Set<Integer>> hugePartitionTags;
Review Comment:
Some doc or comment here?
Also, is it possible to reuse the partitionDataSizes to calculate the huge
partitions?
This is in shuffle server, I'm very concerned about the memory consumption
of related metadata.
##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java:
##########
@@ -124,4 +133,30 @@ public long getPartitionDataSize(int shuffleId, int
partitionId) {
return size;
}
+ public boolean hasHugePartition() {
+ return !(hugePartitionTags.size() == 0);
+ }
+
+ public int getHugePartitionSize() {
+ if (hugePartitionTags == null) {
+ return 0;
+ }
+ return hugePartitionTags.values().stream().map(x -> x.size()).reduce((x,
y) -> x + y).orElse(0);
+ }
+
+ public void markHugePartition(int shuffleId, int partitionId) {
+ hugePartitionTags.computeIfAbsent(shuffleId, key -> {
+ ShuffleServerMetrics.gaugeAppWithHugePartitionNum.inc();
+ ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.inc();
+ return Sets.newConcurrentHashSet();
+ });
+ Set<Integer> partitions = hugePartitionTags.get(shuffleId);
+ if (partitions.contains(partitionId)) {
+ return;
+ }
+ partitions.add(partitionId);
+ ShuffleServerMetrics.counterTotalHugePartitionNum.inc();
Review Comment:
is is quite possible that same partition id is marked multiple times?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]