This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b43bbd863 [#333] feat(server): expose metrics of TopN app bytes in one
shuffle server (#1400)
b43bbd863 is described below
commit b43bbd86349c66e8384a093c3b70293f37689e33
Author: Qing <[email protected]>
AuthorDate: Fri Jan 12 13:47:38 2024 +0800
[#333] feat(server): expose metrics of TopN app bytes in one shuffle server
(#1400)
### What changes were proposed in this pull request?
expose metrics of TOP10 app bytes in one shuffle server #333
### Why are the changes needed?
Fix:
https://github.com/apache/incubator-uniffle/issues/333
https://github.com/apache/incubator-uniffle/issues/681
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../apache/uniffle/server/ShuffleFlushManager.java | 21 +-
.../org/apache/uniffle/server/ShuffleServer.java | 3 +
.../apache/uniffle/server/ShuffleServerConf.java | 13 ++
.../uniffle/server/ShuffleServerMetrics.java | 41 ++++
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 48 ++++-
.../apache/uniffle/server/ShuffleTaskManager.java | 12 ++
.../server/TopNShuffleDataSizeOfAppCalcTask.java | 145 +++++++++++++
.../uniffle/server/ShuffleServerMetricsTest.java | 3 +-
.../TopNShuffleDataSizeOfAppCalcTaskTest.java | 238 +++++++++++++++++++++
9 files changed, 516 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index eec574179..abf9bddf7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
@@ -104,13 +105,23 @@ public class ShuffleFlushManager {
private void recordSuccess(ShuffleDataFlushEvent event, long start) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(),
event.getShuffleBlocks());
ShuffleServerMetrics.incStorageSuccessCounter(event.getUnderStorage().getStorageHost());
- event.doCleanup();
- if (shuffleServer != null) {
- if (LOG.isDebugEnabled()) {
- long duration = System.currentTimeMillis() - start;
- LOG.debug("Flush to file success in {} ms and release {} bytes",
duration, event.getSize());
+
+ ShuffleTaskInfo shuffleTaskInfo =
+
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+ if (null != shuffleTaskInfo) {
+ String storageHost = event.getUnderStorage().getStorageHost();
+ if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
+ shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
+ } else {
+ shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
}
}
+
+ event.doCleanup();
+ if (LOG.isDebugEnabled()) {
+ long duration = System.currentTimeMillis() - start;
+ LOG.debug("Flush to file success in {} ms and release {} bytes",
duration, event.getSize());
+ }
}
public void processEvent(ShuffleDataFlushEvent event) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index d7b3bbcb2..eac948470 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -193,6 +193,9 @@ public class ShuffleServer {
if (executorService != null) {
executorService.shutdownNow();
}
+ if (shuffleTaskManager != null) {
+ shuffleTaskManager.stop();
+ }
running = false;
LOG.info("RPC Server Stopped!");
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 0af62d420..77287b2a7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -538,6 +538,19 @@ public class ShuffleServerConf extends RssBaseConf {
+ "network_bandwidth = 10Gbps, buffer size should be ~
1.25MB."
+ "Default is 0, OS will dynamically adjust the buf size.");
+ public static final ConfigOption<Integer> TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER
=
+ ConfigOptions.key("rss.server.topN.appShuffleDataSize.number")
+ .intType()
+ .defaultValue(10)
+ .withDescription("number of topN shuffle data size of app level.");
+
+ public static final ConfigOption<Integer>
TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL =
+ ConfigOptions.key("rss.server.topN.appShuffleDataSize.refreshIntervalMs")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "refresh interval in ms for TopN shuffle data size of app level
calc task.");
+
public static final ConfigOption<Integer> SUMMARY_METRIC_WAIT_QUEUE_SIZE =
ConfigOptions.key("rss.server.summary.metric.wait.queue.size")
.intType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index dde9bb319..400712076 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -111,6 +111,14 @@ public class ShuffleServerMetrics {
private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
"total_remove_resource_by_shuffle_ids_time";
+ public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP =
"topN_of_total_data_size_for_app";
+ public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP =
+ "topN_of_in_memory_data_size_for_app";
+ public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP =
+ "topN_of_on_localfile_data_size_for_app";
+ public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
+ "topN_of_on_hadoop_data_size_for_app";
+
public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
@@ -170,6 +178,11 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeAppNum;
public static Gauge.Child gaugeTotalPartitionNum;
+ public static Gauge gaugeTotalDataSizeUsage;
+ public static Gauge gaugeInMemoryDataSizeUsage;
+ public static Gauge gaugeOnDiskDataSizeUsage;
+ public static Gauge gaugeOnHadoopDataSizeUsage;
+
public static Counter counterRemoteStorageTotalWrite;
public static Counter counterRemoteStorageRetryWrite;
public static Counter counterRemoteStorageFailedWrite;
@@ -347,5 +360,33 @@ public class ShuffleServerMetrics {
summaryTotalRemoveResourceTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
summaryTotalRemoveResourceByShuffleIdsTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
+
+ gaugeTotalDataSizeUsage =
+ Gauge.build()
+ .name(TOPN_OF_TOTAL_DATA_SIZE_FOR_APP)
+ .help("top N of total shuffle data size for app level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
+
+ gaugeInMemoryDataSizeUsage =
+ Gauge.build()
+ .name(TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP)
+ .help("top N of in memory shuffle data size for app level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
+
+ gaugeOnDiskDataSizeUsage =
+ Gauge.build()
+ .name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP)
+ .help("top N of on disk shuffle data size for app level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
+
+ gaugeOnHadoopDataSizeUsage =
+ Gauge.build()
+ .name(TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP)
+ .help("top N of on hadoop shuffle data size for app level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 10a7fb8fc..e657ec4db 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -50,7 +50,11 @@ public class ShuffleTaskInfo {
private AtomicReference<String> user;
- private AtomicLong totalDataSize = new AtomicLong(0);
+ private final AtomicLong totalDataSize = new AtomicLong(0);
+ private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+ private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
+ private final AtomicLong onHadoopDataSize = new AtomicLong(0);
+
/** shuffleId -> partitionId -> partition shuffle data size */
private Map<Integer, Map<Integer, Long>> partitionDataSizes;
/** shuffleId -> huge partitionIds set */
@@ -115,6 +119,7 @@ public class ShuffleTaskInfo {
public long addPartitionDataSize(int shuffleId, int partitionId, long delta)
{
totalDataSize.addAndGet(delta);
+ inMemoryDataSize.addAndGet(delta);
partitionDataSizes.computeIfAbsent(shuffleId, key ->
JavaUtils.newConcurrentMap());
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
partitions.putIfAbsent(partitionId, 0L);
@@ -125,6 +130,28 @@ public class ShuffleTaskInfo {
return totalDataSize.get();
}
+ public long getInMemoryDataSize() {
+ return inMemoryDataSize.get();
+ }
+
+ public long addOnLocalFileDataSize(long delta) {
+ inMemoryDataSize.addAndGet(-delta);
+ return onLocalFileDataSize.addAndGet(delta);
+ }
+
+ public long getOnLocalFileDataSize() {
+ return onLocalFileDataSize.get();
+ }
+
+ public long addOnHadoopDataSize(long delta) {
+ inMemoryDataSize.addAndGet(-delta);
+ return onHadoopDataSize.addAndGet(delta);
+ }
+
+ public long getOnHadoopDataSize() {
+ return onHadoopDataSize.get();
+ }
+
public long getPartitionDataSize(int shuffleId, int partitionId) {
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
if (partitions == null) {
@@ -166,4 +193,23 @@ public class ShuffleTaskInfo {
partitionId);
}
}
+
+ @Override
+ public String toString() {
+ return "ShuffleTaskInfo{"
+ + "appId='"
+ + appId
+ + '\''
+ + ", totalDataSize="
+ + totalDataSize
+ + ", inMemoryDataSize="
+ + inMemoryDataSize
+ + ", onLocalFileDataSize="
+ + onLocalFileDataSize
+ + ", onHadoopDataSize="
+ + onHadoopDataSize
+ + ", partitionDataSizes="
+ + partitionDataSizes
+ + '}';
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 2148aab9a..90928f3b8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -88,6 +88,7 @@ public class ShuffleTaskManager {
private final ScheduledExecutorService expiredAppCleanupExecutorService;
private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
private ScheduledExecutorService triggerFlushExecutorService;
+ private final TopNShuffleDataSizeOfAppCalcTask
topNShuffleDataSizeOfAppCalcTask;
private final StorageManager storageManager;
private AtomicLong requireBufferId = new AtomicLong(0);
private ShuffleServerConf conf;
@@ -207,6 +208,9 @@ public class ShuffleTaskManager {
clearResourceThread = new Thread(clearResourceRunnable);
clearResourceThread.setName("clearResourceThread");
clearResourceThread.setDaemon(true);
+
+ topNShuffleDataSizeOfAppCalcTask = new
TopNShuffleDataSizeOfAppCalcTask(this, conf);
+ topNShuffleDataSizeOfAppCalcTask.start();
}
private Lock getAppLock(String appId) {
@@ -839,6 +843,14 @@ public class ShuffleTaskManager {
}
}
+ public Map<String, ShuffleTaskInfo> getShuffleTaskInfos() {
+ return shuffleTaskInfos;
+ }
+
+ public void stop() {
+ topNShuffleDataSizeOfAppCalcTask.stop();
+ }
+
public void start() {
clearResourceThread.start();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
new file mode 100644
index 000000000..1b22c64fc
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import io.prometheus.client.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopNShuffleDataSizeOfAppCalcTask {
+ private static final Logger LOG =
LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class);
+
+ private final int topNShuffleDataNumber;
+ private final int topNShuffleDataTaskRefreshInterval;
+
+ private final Gauge gaugeTotalDataSize;
+ private final Gauge gaugeInMemoryDataSize;
+ private final Gauge gaugeOnLocalFileDataSize;
+ private final Gauge gaugeOnHadoopDataSize;
+
+ private final ShuffleTaskManager shuffleTaskManager;
+ private final ScheduledExecutorService scheduler;
+
+ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager,
ShuffleServerConf conf) {
+ topNShuffleDataNumber =
conf.getInteger(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER);
+ topNShuffleDataTaskRefreshInterval =
+
conf.getInteger(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL);
+ shuffleTaskManager = taskManager;
+ this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage;
+ this.gaugeInMemoryDataSize =
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
+ this.gaugeOnLocalFileDataSize =
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
+ this.gaugeOnHadoopDataSize =
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
+ this.scheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ private void calcTopNShuffleDataSize() {
+ List<Map.Entry<String, ShuffleTaskInfo>> topNTaskInfo =
calcTopNTotalDataSizeTaskInfo();
+ gaugeTotalDataSize.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+
gaugeTotalDataSize.labels(taskInfo.getKey()).set(taskInfo.getValue().getTotalDataSize());
+ }
+
+ topNTaskInfo = calcTopNInMemoryDataSizeTaskInfo();
+ gaugeInMemoryDataSize.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+ gaugeInMemoryDataSize
+ .labels(taskInfo.getKey())
+ .set(taskInfo.getValue().getInMemoryDataSize());
+ }
+
+ topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo();
+ gaugeOnLocalFileDataSize.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+ gaugeOnLocalFileDataSize
+ .labels(taskInfo.getKey())
+ .set(taskInfo.getValue().getOnLocalFileDataSize());
+ }
+
+ topNTaskInfo = calcTopNOnHadoopDataSizeTaskInfo();
+ gaugeOnHadoopDataSize.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+ gaugeOnHadoopDataSize
+ .labels(taskInfo.getKey())
+ .set(taskInfo.getValue().getOnHadoopDataSize());
+ }
+ }
+
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNTotalDataSizeTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(e2.getValue().getTotalDataSize(),
e1.getValue().getTotalDataSize()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNInMemoryDataSizeTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(
+ e2.getValue().getInMemoryDataSize(),
e1.getValue().getInMemoryDataSize()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNOnLocalFileDataSizeTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(
+ e2.getValue().getOnLocalFileDataSize(),
e1.getValue().getOnLocalFileDataSize()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNOnHadoopDataSizeTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(
+ e2.getValue().getOnHadoopDataSize(),
e1.getValue().getOnHadoopDataSize()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
+ public void start() {
+ LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule.");
+ this.scheduler.scheduleAtFixedRate(
+ this::calcTopNShuffleDataSize,
+ 0,
+ topNShuffleDataTaskRefreshInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ this.scheduler.shutdown();
+ try {
+ this.scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 6d59b03b0..a5824a5b0 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -61,8 +61,7 @@ public class ShuffleServerMetricsTest {
ssc.set(ShuffleServerConf.RPC_SERVER_PORT, 12346);
ssc.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("tmp"));
ssc.set(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
- ssc.setString(
- ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ ssc.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
ssc.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "fake.coordinator:123");
ssc.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
shuffleServer = new ShuffleServer(ssc);
diff --git
a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
new file mode 100644
index 000000000..3920f9fd6
--- /dev/null
+++
b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
+import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TopNShuffleDataSizeOfAppCalcTaskTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTaskTest.class);
+
+ protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();
+ private static final Long EVENT_THRESHOLD_SIZE = 2048L;
+ protected static final int SHUFFLE_SERVER_PORT = 20001;
+ static @TempDir File tempDir;
+
+ protected static final String LOCALHOST;
+
+ static {
+ try {
+ LOCALHOST = RssUtils.getHostIp();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static final int COORDINATOR_PORT_1 = 19999;
+ protected static final int NETTY_PORT = 21000;
+ protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" +
COORDINATOR_PORT_1;
+
+ protected static ShuffleServerConf getShuffleServerConf() throws Exception {
+ ShuffleServerConf serverConf = new ShuffleServerConf();
+ serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT);
+ serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
+ serverConf.setString("rss.server.buffer.capacity", "671088640");
+ serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
+ serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
+ serverConf.setString("rss.server.read.buffer.capacity", "335544320");
+ serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+ serverConf.setString("rss.server.heartbeat.delay", "1000");
+ serverConf.setString("rss.server.heartbeat.interval", "1000");
+ serverConf.setInteger("rss.jetty.http.port", 18080);
+ serverConf.setInteger("rss.jetty.corePool.size", 64);
+ serverConf.setInteger("rss.rpc.executor.size", 10);
+ serverConf.setString("rss.server.hadoop.dfs.replication", "2");
+ serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
+ serverConf.setBoolean("rss.server.health.check.enable", false);
+ serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
+ serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT);
+ serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
+ return serverConf;
+ }
+
+ @BeforeAll
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ File dataDir1 = new File(tmpDir, "data1");
+ String basePath = dataDir1.getAbsolutePath();
+ shuffleServerConf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ shuffleServerConf.set(
+ ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
EVENT_THRESHOLD_SIZE);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
+ shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
2000L);
+ shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED,
5000L);
+
shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL,
700);
+
shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER, 5);
+
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ protected static void createShuffleServer(ShuffleServerConf serverConf)
throws Exception {
+ shuffleServers.add(new ShuffleServer(serverConf));
+ }
+
+ public static void startServers() throws Exception {
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ shuffleServer.start();
+ }
+ }
+
+ private void registerAndRequireBuffer(String appId, int length) {
+ ShuffleServerGrpcClient shuffleServerClient =
+ new ShuffleServerGrpcClient("localhost", SHUFFLE_SERVER_PORT);
+ int shuffleId = 0;
+ int partitionId = 0;
+ List<PartitionRange> partitionIds = Lists.newArrayList(new
PartitionRange(0, 3));
+
+ RssRegisterShuffleRequest registerShuffleRequest =
+ new RssRegisterShuffleRequest(appId, shuffleId, partitionIds, "");
+ RssRegisterShuffleResponse registerResponse =
+ shuffleServerClient.registerShuffle(registerShuffleRequest);
+ assertSame(StatusCode.SUCCESS, registerResponse.getStatusCode());
+
+ List<ShuffleBlockInfo> blockInfos =
+ Lists.newArrayList(
+ new ShuffleBlockInfo(
+ shuffleId,
+ partitionId,
+ 0,
+ length,
+ 0,
+ new byte[] {},
+ Lists.newArrayList(),
+ 0,
+ 100,
+ 0));
+
+ Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+ partitionToBlocks.put(partitionId, blockInfos);
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks =
Maps.newHashMap();
+ shuffleToBlocks.put(shuffleId, partitionToBlocks);
+
+ RssSendShuffleDataRequest sendShuffleDataRequest =
+ new RssSendShuffleDataRequest(appId, 1, 1000, shuffleToBlocks);
+ RssSendShuffleDataResponse response =
+ shuffleServerClient.sendShuffleData(sendShuffleDataRequest);
+ assertSame(StatusCode.SUCCESS, response.getStatusCode());
+ }
+
+ @Test
+ public void testTopNShuffleDataSizeOfAppCalcTask() throws Exception {
+ // Here is 6 app, but config max top n number is 5
+ registerAndRequireBuffer("application_id_1", 1000);
+ registerAndRequireBuffer("application_id_2", 2000);
+ registerAndRequireBuffer("application_id_3", 3000);
+ registerAndRequireBuffer("application_id_4", 4000);
+ registerAndRequireBuffer("application_id_5", 5000);
+ registerAndRequireBuffer("application_id_6", 6000);
+
+ Thread.sleep(500);
+ String content =
TestUtils.httpGet("http://127.0.0.1:18080/metrics/server");
+ LOG.info(content);
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(content);
+ JsonNode metricsNode = actualObj.get("metrics");
+ Set<String> topNTotalDataSizeApps = new HashSet<>();
+ Set<String> topNInMemoryDataSizeApps = new HashSet<>();
+ for (int i = 0; i < metricsNode.size(); i++) {
+ JsonNode metricsName = metricsNode.get(i).get("name");
+ if
(ShuffleServerMetrics.TOPN_OF_TOTAL_DATA_SIZE_FOR_APP.equals(metricsName.textValue()))
{
+ Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields();
+ while (it.hasNext()) {
+ Map.Entry<String, JsonNode> entry = it.next();
+ if ("labelValues".equalsIgnoreCase(entry.getKey())) {
+ topNTotalDataSizeApps.add(entry.getValue().toString());
+ }
+ }
+ }
+ if (ShuffleServerMetrics.TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP.equals(
+ metricsName.textValue())) {
+ Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields();
+ while (it.hasNext()) {
+ Map.Entry<String, JsonNode> entry = it.next();
+ if ("labelValues".equalsIgnoreCase(entry.getKey())) {
+ topNInMemoryDataSizeApps.add(entry.getValue().toString());
+ }
+ }
+ }
+ }
+
+ Set<String> expectedTopNApps =
+ Sets.newHashSet(
+ "[\"application_id_6\"]",
+ "[\"application_id_5\"]",
+ "[\"application_id_4\"]",
+ "[\"application_id_3\"]",
+ "[\"application_id_2\"]");
+ assertTrue(
+ expectedTopNApps.containsAll(topNTotalDataSizeApps)
+ && expectedTopNApps.size() == topNTotalDataSizeApps.size());
+ assertTrue(
+ expectedTopNApps.containsAll(topNInMemoryDataSizeApps)
+ && expectedTopNApps.size() == topNInMemoryDataSizeApps.size());
+ }
+
+ @AfterAll
+ public static void shutdownServers() throws Exception {
+
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ shuffleServer.stopServer();
+ }
+ shuffleServers = Lists.newArrayList();
+ ShuffleServerMetrics.clear();
+ }
+}