This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b4ed85429 [#1903] improvement(server): Add some metrics for shuffle
server (#1904)
b4ed85429 is described below
commit b4ed85429c601216508d48b6e6c9b848f032b797
Author: kqhzz <[email protected]>
AuthorDate: Thu Jul 18 02:38:28 2024 +0800
[#1903] improvement(server): Add some metrics for shuffle server (#1904)
### What changes were proposed in this pull request?
Add some metrics for the shuffle server.
### Why are the changes needed?
This would be more helpful for us to understand the status of the shuffle
server.
Fix: #1903
### Does this PR introduce _any_ user-facing change?
No.
---
.../common/storage/ApplicationStorageInfo.java | 56 ++++++++++++++++++++++
.../uniffle/server/ShuffleServerMetrics.java | 15 ++++++
.../server/storage/HadoopStorageManager.java | 1 +
.../server/storage/LocalStorageManager.java | 1 +
.../server/storage/SingleStorageManager.java | 27 +++++++++++
.../uniffle/storage/common/AbstractStorage.java | 5 ++
.../org/apache/uniffle/storage/common/Storage.java | 2 +
7 files changed, 107 insertions(+)
diff --git
a/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
new file mode 100644
index 000000000..06ddcdf1f
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ApplicationStorageInfo {
+ private String appId;
+ private AtomicLong fileNum;
+ private AtomicLong usedBytes;
+
+ public ApplicationStorageInfo(String appId) {
+ this.appId = appId;
+ this.fileNum = new AtomicLong();
+ this.usedBytes = new AtomicLong();
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public long getFileNum() {
+ return fileNum.get();
+ }
+
+ public void incFileNum(long fileNum) {
+ this.fileNum.addAndGet(fileNum);
+ }
+
+ public long getUsedBytes() {
+ return usedBytes.get();
+ }
+
+ public void incUsedBytes(long usedBytes) {
+ this.usedBytes.addAndGet(usedBytes);
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 8eada73c5..309cee553 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -39,6 +39,11 @@ public class ShuffleServerMetrics {
private static final String TOTAL_RECEIVED_DATA = "total_received_data";
private static final String TOTAL_WRITE_DATA = "total_write_data";
+ private static final String TOTAL_DELETE_DATA = "total_delete_data";
+ private static final String TOTAL_FLUSH_FILE_NUM = "total_flush_file_num";
+ private static final String TOTAL_DELETE_FILE_NUM = "total_delete_file_num";
+ private static final String STORAGE_USED_BYTES = "storage_used_bytes";
+ private static final String FLUSH_FILE_NUM = "flush_file_num";
private static final String TOTAL_WRITE_BLOCK = "total_write_block";
private static final String WRITE_BLOCK_SIZE = "write_block_size";
private static final String TOTAL_WRITE_TIME = "total_write_time";
@@ -155,6 +160,11 @@ public class ShuffleServerMetrics {
public static Counter.Child counterTotalReceivedDataSize;
public static Counter.Child counterTotalWriteDataSize;
+ public static Counter.Child counterTotalDeleteDataSize;
+ public static Counter.Child counterTotalFlushFileNum;
+ public static Counter.Child counterTotalDeleteFileNum;
+ public static Gauge.Child gaugeStorageUsedBytes;
+ public static Gauge.Child gaugeFlushFileNum;
public static Counter.Child counterTotalWriteBlockSize;
public static Histogram appHistogramWriteBlockSize;
public static Counter.Child counterTotalWriteTime;
@@ -337,6 +347,11 @@ public class ShuffleServerMetrics {
private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalReceivedDataSize =
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
counterTotalWriteDataSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
+ counterTotalDeleteDataSize =
metricsManager.addLabeledCounter(TOTAL_DELETE_DATA);
+ counterTotalFlushFileNum =
metricsManager.addLabeledCounter(TOTAL_FLUSH_FILE_NUM);
+ counterTotalDeleteFileNum =
metricsManager.addLabeledCounter(TOTAL_DELETE_FILE_NUM);
+ gaugeStorageUsedBytes = metricsManager.addLabeledGauge(STORAGE_USED_BYTES);
+ gaugeFlushFileNum = metricsManager.addLabeledGauge(FLUSH_FILE_NUM);
counterTotalWriteBlockSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
appHistogramWriteBlockSize =
metricsManager.addHistogram(
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 90d62a1ed..66c4349cf 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -148,6 +148,7 @@ public class HadoopStorageManager extends
SingleStorageManager {
}
}
deleteHandler.delete(deletePaths.toArray(new String[0]), appId,
event.getUser());
+ removeAppStorageInfo(event);
} else {
LOG.warn("Storage gotten is null when removing resources for event: {}",
event);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index c55fe9060..488629544 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -326,6 +326,7 @@ public class LocalStorageManager extends
SingleStorageManager {
.collect(Collectors.toList());
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]),
appId, user);
+ removeAppStorageInfo(event);
}
private void cleanupStorageSelectionCache(PurgeEvent event) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 6acead608..fb974d5f5 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server.storage;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
@@ -26,10 +27,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.storage.ApplicationStorageInfo;
+import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageWriteMetrics;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -42,6 +46,8 @@ public abstract class SingleStorageManager implements
StorageManager {
private final long eventSizeThresholdL1;
private final long eventSizeThresholdL2;
private final long eventSizeThresholdL3;
+ protected final Map<String, ApplicationStorageInfo> appStorageInfoMap =
+ JavaUtils.newConcurrentMap();
public SingleStorageManager(ShuffleServerConf conf) {
writeSlowThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_WRITE_SLOW_THRESHOLD);
@@ -92,6 +98,16 @@ public abstract class SingleStorageManager implements
StorageManager {
} else {
ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
}
+ String appId = event.getAppId();
+ ApplicationStorageInfo appStorage =
+ appStorageInfoMap.computeIfAbsent(appId, id -> new
ApplicationStorageInfo(appId));
+ appStorage.incUsedBytes(event.getSize());
+ ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getSize());
+ if (event.getUnderStorage().containsWriteHandler(appId)) {
+ appStorage.incFileNum(1);
+ ShuffleServerMetrics.gaugeFlushFileNum.inc();
+ ShuffleServerMetrics.counterTotalFlushFileNum.inc();
+ }
Storage storage = event.getUnderStorage();
if (storage != null) {
storage.updateWriteMetrics(metrics);
@@ -149,4 +165,15 @@ public abstract class SingleStorageManager implements
StorageManager {
public void stop() {
// do nothing
}
+
+ public void removeAppStorageInfo(PurgeEvent event) {
+ String appId = event.getAppId();
+ ApplicationStorageInfo info = appStorageInfoMap.remove(appId);
+ if (info != null) {
+ ShuffleServerMetrics.gaugeStorageUsedBytes.dec(info.getUsedBytes());
+ ShuffleServerMetrics.gaugeFlushFileNum.dec(info.getFileNum());
+ ShuffleServerMetrics.counterTotalDeleteDataSize.inc(info.getUsedBytes());
+ ShuffleServerMetrics.counterTotalDeleteFileNum.inc(info.getFileNum());
+ }
+ }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index f8bf8c9b4..ef2b14b80 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -68,6 +68,11 @@ public abstract class AbstractStorage implements Storage {
protected abstract ServerReadHandler
newReadHandler(CreateShuffleReadHandlerRequest request);
+ @Override
+ public boolean containsWriteHandler(String appId) {
+ return writerHandlers.containsKey(appId);
+ }
+
public boolean containsWriteHandler(String appId, int shuffleId, int
partition) {
Map<String, ShuffleWriteHandler> map = writerHandlers.get(appId);
if (map == null || map.isEmpty()) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index 43168c324..80f36a7bd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -35,6 +35,8 @@ public interface Storage {
ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest
request)
throws IOException;
+ boolean containsWriteHandler(String appId);
+
ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest
request);
void removeHandlers(String appId);