This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b43bbd863 [#333] feat(server): expose metrics of TopN app bytes in one 
shuffle server (#1400)
b43bbd863 is described below

commit b43bbd86349c66e8384a093c3b70293f37689e33
Author: Qing <[email protected]>
AuthorDate: Fri Jan 12 13:47:38 2024 +0800

    [#333] feat(server): expose metrics of TopN app bytes in one shuffle server 
(#1400)
    
    ### What changes were proposed in this pull request?
    
    expose metrics of TOP10 app bytes in one shuffle server #333
    
    ### Why are the changes needed?
    
    Fix:
    https://github.com/apache/incubator-uniffle/issues/333
    https://github.com/apache/incubator-uniffle/issues/681
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test
---
 .../apache/uniffle/server/ShuffleFlushManager.java |  21 +-
 .../org/apache/uniffle/server/ShuffleServer.java   |   3 +
 .../apache/uniffle/server/ShuffleServerConf.java   |  13 ++
 .../uniffle/server/ShuffleServerMetrics.java       |  41 ++++
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  48 ++++-
 .../apache/uniffle/server/ShuffleTaskManager.java  |  12 ++
 .../server/TopNShuffleDataSizeOfAppCalcTask.java   | 145 +++++++++++++
 .../uniffle/server/ShuffleServerMetricsTest.java   |   3 +-
 .../TopNShuffleDataSizeOfAppCalcTaskTest.java      | 238 +++++++++++++++++++++
 9 files changed, 516 insertions(+), 8 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index eec574179..abf9bddf7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
@@ -104,13 +105,23 @@ public class ShuffleFlushManager {
   private void recordSuccess(ShuffleDataFlushEvent event, long start) {
     updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
     
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
-    event.doCleanup();
-    if (shuffleServer != null) {
-      if (LOG.isDebugEnabled()) {
-        long duration = System.currentTimeMillis() - start;
-        LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
+
+    ShuffleTaskInfo shuffleTaskInfo =
+        
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+    if (null != shuffleTaskInfo) {
+      String storageHost = event.getUnderStorage().getStorageHost();
+      if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
+        shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
+      } else {
+        shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
       }
     }
+
+    event.doCleanup();
+    if (LOG.isDebugEnabled()) {
+      long duration = System.currentTimeMillis() - start;
+      LOG.debug("Flush to file success in {} ms and release {} bytes", 
duration, event.getSize());
+    }
   }
 
   public void processEvent(ShuffleDataFlushEvent event) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index d7b3bbcb2..eac948470 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -193,6 +193,9 @@ public class ShuffleServer {
     if (executorService != null) {
       executorService.shutdownNow();
     }
+    if (shuffleTaskManager != null) {
+      shuffleTaskManager.stop();
+    }
     running = false;
     LOG.info("RPC Server Stopped!");
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 0af62d420..77287b2a7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -538,6 +538,19 @@ public class ShuffleServerConf extends RssBaseConf {
                   + "network_bandwidth = 10Gbps, buffer size should be ~ 
1.25MB."
                   + "Default is 0, OS will dynamically adjust the buf size.");
 
+  public static final ConfigOption<Integer> TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER 
=
+      ConfigOptions.key("rss.server.topN.appShuffleDataSize.number")
+          .intType()
+          .defaultValue(10)
+          .withDescription("number of topN shuffle data size of app level.");
+
+  public static final ConfigOption<Integer> 
TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL =
+      ConfigOptions.key("rss.server.topN.appShuffleDataSize.refreshIntervalMs")
+          .intType()
+          .defaultValue(1000)
+          .withDescription(
+              "refresh interval in ms for TopN shuffle data size of app level 
calc task.");
+
   public static final ConfigOption<Integer> SUMMARY_METRIC_WAIT_QUEUE_SIZE =
       ConfigOptions.key("rss.server.summary.metric.wait.queue.size")
           .intType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index dde9bb319..400712076 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -111,6 +111,14 @@ public class ShuffleServerMetrics {
   private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
       "total_remove_resource_by_shuffle_ids_time";
 
+  public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP = 
"topN_of_total_data_size_for_app";
+  public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP =
+      "topN_of_in_memory_data_size_for_app";
+  public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP =
+      "topN_of_on_localfile_data_size_for_app";
+  public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
+      "topN_of_on_hadoop_data_size_for_app";
+
   public static Counter.Child counterTotalAppNum;
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
@@ -170,6 +178,11 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeAppNum;
   public static Gauge.Child gaugeTotalPartitionNum;
 
+  public static Gauge gaugeTotalDataSizeUsage;
+  public static Gauge gaugeInMemoryDataSizeUsage;
+  public static Gauge gaugeOnDiskDataSizeUsage;
+  public static Gauge gaugeOnHadoopDataSizeUsage;
+
   public static Counter counterRemoteStorageTotalWrite;
   public static Counter counterRemoteStorageRetryWrite;
   public static Counter counterRemoteStorageFailedWrite;
@@ -347,5 +360,33 @@ public class ShuffleServerMetrics {
     summaryTotalRemoveResourceTime = 
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
     summaryTotalRemoveResourceByShuffleIdsTime =
         metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
+
+    gaugeTotalDataSizeUsage =
+        Gauge.build()
+            .name(TOPN_OF_TOTAL_DATA_SIZE_FOR_APP)
+            .help("top N of total shuffle data size for app level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
+
+    gaugeInMemoryDataSizeUsage =
+        Gauge.build()
+            .name(TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP)
+            .help("top N of in memory shuffle data size for app level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
+
+    gaugeOnDiskDataSizeUsage =
+        Gauge.build()
+            .name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP)
+            .help("top N of on disk shuffle data size for app level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
+
+    gaugeOnHadoopDataSizeUsage =
+        Gauge.build()
+            .name(TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP)
+            .help("top N of on hadoop shuffle data size for app level")
+            .labelNames("app_id")
+            .register(metricsManager.getCollectorRegistry());
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 10a7fb8fc..e657ec4db 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -50,7 +50,11 @@ public class ShuffleTaskInfo {
 
   private AtomicReference<String> user;
 
-  private AtomicLong totalDataSize = new AtomicLong(0);
+  private final AtomicLong totalDataSize = new AtomicLong(0);
+  private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+  private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
+  private final AtomicLong onHadoopDataSize = new AtomicLong(0);
+
   /** shuffleId -> partitionId -> partition shuffle data size */
   private Map<Integer, Map<Integer, Long>> partitionDataSizes;
   /** shuffleId -> huge partitionIds set */
@@ -115,6 +119,7 @@ public class ShuffleTaskInfo {
 
   public long addPartitionDataSize(int shuffleId, int partitionId, long delta) 
{
     totalDataSize.addAndGet(delta);
+    inMemoryDataSize.addAndGet(delta);
     partitionDataSizes.computeIfAbsent(shuffleId, key -> 
JavaUtils.newConcurrentMap());
     Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
     partitions.putIfAbsent(partitionId, 0L);
@@ -125,6 +130,28 @@ public class ShuffleTaskInfo {
     return totalDataSize.get();
   }
 
+  public long getInMemoryDataSize() {
+    return inMemoryDataSize.get();
+  }
+
+  public long addOnLocalFileDataSize(long delta) {
+    inMemoryDataSize.addAndGet(-delta);
+    return onLocalFileDataSize.addAndGet(delta);
+  }
+
+  public long getOnLocalFileDataSize() {
+    return onLocalFileDataSize.get();
+  }
+
+  public long addOnHadoopDataSize(long delta) {
+    inMemoryDataSize.addAndGet(-delta);
+    return onHadoopDataSize.addAndGet(delta);
+  }
+
+  public long getOnHadoopDataSize() {
+    return onHadoopDataSize.get();
+  }
+
   public long getPartitionDataSize(int shuffleId, int partitionId) {
     Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
     if (partitions == null) {
@@ -166,4 +193,23 @@ public class ShuffleTaskInfo {
           partitionId);
     }
   }
+
+  @Override
+  public String toString() {
+    return "ShuffleTaskInfo{"
+        + "appId='"
+        + appId
+        + '\''
+        + ", totalDataSize="
+        + totalDataSize
+        + ", inMemoryDataSize="
+        + inMemoryDataSize
+        + ", onLocalFileDataSize="
+        + onLocalFileDataSize
+        + ", onHadoopDataSize="
+        + onHadoopDataSize
+        + ", partitionDataSizes="
+        + partitionDataSizes
+        + '}';
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 2148aab9a..90928f3b8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -88,6 +88,7 @@ public class ShuffleTaskManager {
   private final ScheduledExecutorService expiredAppCleanupExecutorService;
   private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
   private ScheduledExecutorService triggerFlushExecutorService;
+  private final TopNShuffleDataSizeOfAppCalcTask 
topNShuffleDataSizeOfAppCalcTask;
   private final StorageManager storageManager;
   private AtomicLong requireBufferId = new AtomicLong(0);
   private ShuffleServerConf conf;
@@ -207,6 +208,9 @@ public class ShuffleTaskManager {
     clearResourceThread = new Thread(clearResourceRunnable);
     clearResourceThread.setName("clearResourceThread");
     clearResourceThread.setDaemon(true);
+
+    topNShuffleDataSizeOfAppCalcTask = new 
TopNShuffleDataSizeOfAppCalcTask(this, conf);
+    topNShuffleDataSizeOfAppCalcTask.start();
   }
 
   private Lock getAppLock(String appId) {
@@ -839,6 +843,14 @@ public class ShuffleTaskManager {
     }
   }
 
+  public Map<String, ShuffleTaskInfo> getShuffleTaskInfos() {
+    return shuffleTaskInfos;
+  }
+
+  public void stop() {
+    topNShuffleDataSizeOfAppCalcTask.stop();
+  }
+
   public void start() {
     clearResourceThread.start();
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
new file mode 100644
index 000000000..1b22c64fc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import io.prometheus.client.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopNShuffleDataSizeOfAppCalcTask {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class);
+
+  private final int topNShuffleDataNumber;
+  private final int topNShuffleDataTaskRefreshInterval;
+
+  private final Gauge gaugeTotalDataSize;
+  private final Gauge gaugeInMemoryDataSize;
+  private final Gauge gaugeOnLocalFileDataSize;
+  private final Gauge gaugeOnHadoopDataSize;
+
+  private final ShuffleTaskManager shuffleTaskManager;
+  private final ScheduledExecutorService scheduler;
+
+  public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, 
ShuffleServerConf conf) {
+    topNShuffleDataNumber = 
conf.getInteger(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER);
+    topNShuffleDataTaskRefreshInterval =
+        
conf.getInteger(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL);
+    shuffleTaskManager = taskManager;
+    this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage;
+    this.gaugeInMemoryDataSize = 
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
+    this.gaugeOnLocalFileDataSize = 
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
+    this.gaugeOnHadoopDataSize = 
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
+    this.scheduler = Executors.newScheduledThreadPool(1);
+  }
+
+  private void calcTopNShuffleDataSize() {
+    List<Map.Entry<String, ShuffleTaskInfo>> topNTaskInfo = 
calcTopNTotalDataSizeTaskInfo();
+    gaugeTotalDataSize.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      
gaugeTotalDataSize.labels(taskInfo.getKey()).set(taskInfo.getValue().getTotalDataSize());
+    }
+
+    topNTaskInfo = calcTopNInMemoryDataSizeTaskInfo();
+    gaugeInMemoryDataSize.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      gaugeInMemoryDataSize
+          .labels(taskInfo.getKey())
+          .set(taskInfo.getValue().getInMemoryDataSize());
+    }
+
+    topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo();
+    gaugeOnLocalFileDataSize.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      gaugeOnLocalFileDataSize
+          .labels(taskInfo.getKey())
+          .set(taskInfo.getValue().getOnLocalFileDataSize());
+    }
+
+    topNTaskInfo = calcTopNOnHadoopDataSizeTaskInfo();
+    gaugeOnHadoopDataSize.clear();
+    for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+      gaugeOnHadoopDataSize
+          .labels(taskInfo.getKey())
+          .set(taskInfo.getValue().getOnHadoopDataSize());
+    }
+  }
+
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNTotalDataSizeTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(e2.getValue().getTotalDataSize(), 
e1.getValue().getTotalDataSize()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNInMemoryDataSizeTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(
+                    e2.getValue().getInMemoryDataSize(), 
e1.getValue().getInMemoryDataSize()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNOnLocalFileDataSizeTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(
+                    e2.getValue().getOnLocalFileDataSize(), 
e1.getValue().getOnLocalFileDataSize()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
+  public List<Map.Entry<String, ShuffleTaskInfo>> 
calcTopNOnHadoopDataSizeTaskInfo() {
+    return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+        .sorted(
+            (e1, e2) ->
+                Long.compare(
+                    e2.getValue().getOnHadoopDataSize(), 
e1.getValue().getOnHadoopDataSize()))
+        .limit(topNShuffleDataNumber)
+        .collect(Collectors.toList());
+  }
+
+  public void start() {
+    LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule.");
+    this.scheduler.scheduleAtFixedRate(
+        this::calcTopNShuffleDataSize,
+        0,
+        topNShuffleDataTaskRefreshInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void stop() {
+    this.scheduler.shutdown();
+    try {
+      this.scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 6d59b03b0..a5824a5b0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -61,8 +61,7 @@ public class ShuffleServerMetricsTest {
     ssc.set(ShuffleServerConf.RPC_SERVER_PORT, 12346);
     ssc.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("tmp"));
     ssc.set(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
-    ssc.setString(
-        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    ssc.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE.name());
     ssc.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "fake.coordinator:123");
     ssc.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
     shuffleServer = new ShuffleServer(ssc);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
 
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
new file mode 100644
index 000000000..3920f9fd6
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
+import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TopNShuffleDataSizeOfAppCalcTaskTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTaskTest.class);
+
+  protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();
+  private static final Long EVENT_THRESHOLD_SIZE = 2048L;
+  protected static final int SHUFFLE_SERVER_PORT = 20001;
+  static @TempDir File tempDir;
+
+  protected static final String LOCALHOST;
+
+  static {
+    try {
+      LOCALHOST = RssUtils.getHostIp();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static final int COORDINATOR_PORT_1 = 19999;
+  protected static final int NETTY_PORT = 21000;
+  protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" + 
COORDINATOR_PORT_1;
+
+  protected static ShuffleServerConf getShuffleServerConf() throws Exception {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT);
+    serverConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
+    serverConf.setString("rss.server.buffer.capacity", "671088640");
+    serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
+    serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
+    serverConf.setString("rss.server.read.buffer.capacity", "335544320");
+    serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+    serverConf.setString("rss.server.heartbeat.delay", "1000");
+    serverConf.setString("rss.server.heartbeat.interval", "1000");
+    serverConf.setInteger("rss.jetty.http.port", 18080);
+    serverConf.setInteger("rss.jetty.corePool.size", 64);
+    serverConf.setInteger("rss.rpc.executor.size", 10);
+    serverConf.setString("rss.server.hadoop.dfs.replication", "2");
+    serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 
1024L);
+    serverConf.setBoolean("rss.server.health.check.enable", false);
+    serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
+    serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT);
+    serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
+    return serverConf;
+  }
+
+  @BeforeAll
+  public static void setupServers(@TempDir File tmpDir) throws Exception {
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    File dataDir1 = new File(tmpDir, "data1");
+    String basePath = dataDir1.getAbsolutePath();
+    shuffleServerConf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    shuffleServerConf.set(
+        ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
EVENT_THRESHOLD_SIZE);
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(basePath));
+    shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
2000L);
+    shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 
5000L);
+    
shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL,
 700);
+    
shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER, 5);
+
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  protected static void createShuffleServer(ShuffleServerConf serverConf) 
throws Exception {
+    shuffleServers.add(new ShuffleServer(serverConf));
+  }
+
+  public static void startServers() throws Exception {
+    for (ShuffleServer shuffleServer : shuffleServers) {
+      shuffleServer.start();
+    }
+  }
+
+  private void registerAndRequireBuffer(String appId, int length) {
+    ShuffleServerGrpcClient shuffleServerClient =
+        new ShuffleServerGrpcClient("localhost", SHUFFLE_SERVER_PORT);
+    int shuffleId = 0;
+    int partitionId = 0;
+    List<PartitionRange> partitionIds = Lists.newArrayList(new 
PartitionRange(0, 3));
+
+    RssRegisterShuffleRequest registerShuffleRequest =
+        new RssRegisterShuffleRequest(appId, shuffleId, partitionIds, "");
+    RssRegisterShuffleResponse registerResponse =
+        shuffleServerClient.registerShuffle(registerShuffleRequest);
+    assertSame(StatusCode.SUCCESS, registerResponse.getStatusCode());
+
+    List<ShuffleBlockInfo> blockInfos =
+        Lists.newArrayList(
+            new ShuffleBlockInfo(
+                shuffleId,
+                partitionId,
+                0,
+                length,
+                0,
+                new byte[] {},
+                Lists.newArrayList(),
+                0,
+                100,
+                0));
+
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(partitionId, blockInfos);
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = 
Maps.newHashMap();
+    shuffleToBlocks.put(shuffleId, partitionToBlocks);
+
+    RssSendShuffleDataRequest sendShuffleDataRequest =
+        new RssSendShuffleDataRequest(appId, 1, 1000, shuffleToBlocks);
+    RssSendShuffleDataResponse response =
+        shuffleServerClient.sendShuffleData(sendShuffleDataRequest);
+    assertSame(StatusCode.SUCCESS, response.getStatusCode());
+  }
+
+  @Test
+  public void testTopNShuffleDataSizeOfAppCalcTask() throws Exception {
+    // Here is 6 app, but config max top n number is 5
+    registerAndRequireBuffer("application_id_1", 1000);
+    registerAndRequireBuffer("application_id_2", 2000);
+    registerAndRequireBuffer("application_id_3", 3000);
+    registerAndRequireBuffer("application_id_4", 4000);
+    registerAndRequireBuffer("application_id_5", 5000);
+    registerAndRequireBuffer("application_id_6", 6000);
+
+    Thread.sleep(500);
+    String content = 
TestUtils.httpGet("http://127.0.0.1:18080/metrics/server";);
+    LOG.info(content);
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode actualObj = mapper.readTree(content);
+    JsonNode metricsNode = actualObj.get("metrics");
+    Set<String> topNTotalDataSizeApps = new HashSet<>();
+    Set<String> topNInMemoryDataSizeApps = new HashSet<>();
+    for (int i = 0; i < metricsNode.size(); i++) {
+      JsonNode metricsName = metricsNode.get(i).get("name");
+      if 
(ShuffleServerMetrics.TOPN_OF_TOTAL_DATA_SIZE_FOR_APP.equals(metricsName.textValue()))
 {
+        Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields();
+        while (it.hasNext()) {
+          Map.Entry<String, JsonNode> entry = it.next();
+          if ("labelValues".equalsIgnoreCase(entry.getKey())) {
+            topNTotalDataSizeApps.add(entry.getValue().toString());
+          }
+        }
+      }
+      if (ShuffleServerMetrics.TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP.equals(
+          metricsName.textValue())) {
+        Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields();
+        while (it.hasNext()) {
+          Map.Entry<String, JsonNode> entry = it.next();
+          if ("labelValues".equalsIgnoreCase(entry.getKey())) {
+            topNInMemoryDataSizeApps.add(entry.getValue().toString());
+          }
+        }
+      }
+    }
+
+    Set<String> expectedTopNApps =
+        Sets.newHashSet(
+            "[\"application_id_6\"]",
+            "[\"application_id_5\"]",
+            "[\"application_id_4\"]",
+            "[\"application_id_3\"]",
+            "[\"application_id_2\"]");
+    assertTrue(
+        expectedTopNApps.containsAll(topNTotalDataSizeApps)
+            && expectedTopNApps.size() == topNTotalDataSizeApps.size());
+    assertTrue(
+        expectedTopNApps.containsAll(topNInMemoryDataSizeApps)
+            && expectedTopNApps.size() == topNInMemoryDataSizeApps.size());
+  }
+
+  @AfterAll
+  public static void shutdownServers() throws Exception {
+
+    for (ShuffleServer shuffleServer : shuffleServers) {
+      shuffleServer.stopServer();
+    }
+    shuffleServers = Lists.newArrayList();
+    ShuffleServerMetrics.clear();
+  }
+}

Reply via email to