This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ba2578532 [#1356] feat(server): improve expired buffers metric and log
(#1469)
ba2578532 is described below
commit ba25785324f3b19a5780eb1ff9d696717420e2b3
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jan 23 10:07:23 2024 +0800
[#1356] feat(server): improve expired buffers metric and log (#1469)
### What changes were proposed in this pull request?
improve expired buffers metric and log
Once the expired buffer happened, it means the server or client may have
problems,
we should find out the related apps by the log shown in the server
### Why are the changes needed?
For #1356
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
---
.../apache/uniffle/server/ShuffleServerMetrics.java | 7 +++++++
.../org/apache/uniffle/server/ShuffleTaskManager.java | 19 ++++++++++++++-----
.../uniffle/server/buffer/PreAllocatedBufferInfo.java | 9 +++++++--
3 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 0e305c2bb..c5aeac615 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -115,6 +115,9 @@ public class ShuffleServerMetrics {
private static final String LOCAL_FILE_EVENT_FLUSH_NUM =
"local_file_event_flush_num";
private static final String HADOOP_EVENT_FLUSH_NUM =
"hadoop_event_flush_num";
+ private static final String TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM =
+ "total_expired_preAllocated_buffer_num";
+
private static final String TOTAL_REMOVE_RESOURCE_TIME =
"total_remove_resource_time";
private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
"total_remove_resource_by_shuffle_ids_time";
@@ -205,6 +208,7 @@ public class ShuffleServerMetrics {
private static String tags;
public static Counter counterLocalFileEventFlush;
public static Counter counterHadoopEventFlush;
+ public static Counter counterPreAllocatedBufferExpired;
private static MetricsManager metricsManager;
private static boolean isRegister = false;
@@ -394,6 +398,9 @@ public class ShuffleServerMetrics {
counterLocalFileEventFlush =
metricsManager.addCounter(LOCAL_FILE_EVENT_FLUSH_NUM);
counterHadoopEventFlush =
metricsManager.addCounter(HADOOP_EVENT_FLUSH_NUM);
+ counterPreAllocatedBufferExpired =
+ metricsManager.addCounter(TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM);
+
summaryTotalRemoveResourceTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
summaryTotalRemoveResourceByShuffleIdsTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 90928f3b8..efcd56150 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -478,15 +478,15 @@ public class ShuffleTaskManager {
throw new NoBufferForHugePartitionException(errorMessage);
}
}
- return requireBuffer(requireSize);
+ return requireBuffer(appId, requireSize);
}
- public long requireBuffer(int requireSize) {
+ public long requireBuffer(String appId, int requireSize) {
if (shuffleBufferManager.requireMemory(requireSize, true)) {
long requireId = requireBufferId.incrementAndGet();
requireBufferIds.put(
requireId,
- new PreAllocatedBufferInfo(requireId, System.currentTimeMillis(),
requireSize));
+ new PreAllocatedBufferInfo(appId, requireId,
System.currentTimeMillis(), requireSize));
return requireId;
} else {
LOG.error("Failed to require buffer, require size: {}", requireSize);
@@ -494,6 +494,11 @@ public class ShuffleTaskManager {
}
}
+ public long requireBuffer(int requireSize) {
+ // appId of EMPTY means the client uses the old version that should be
upgraded.
+ return requireBuffer("EMPTY", requireSize);
+ }
+
public byte[] getFinishedBlockIds(String appId, Integer shuffleId,
Set<Integer> partitions)
throws IOException {
refreshAppId(appId);
@@ -781,9 +786,13 @@ public class ShuffleTaskManager {
// move release memory code down to here as the requiredBuffer could
be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false,
true);
- LOG.info("Remove expired preAllocatedBuffer " + requireId);
+ LOG.warn(
+ "Remove expired preAllocatedBuffer[id={}] that required by app:
{}",
+ requireId,
+ info.getAppId());
+ ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
- LOG.info("PreAllocatedBuffer[id={}] has already been removed",
requireId);
+ LOG.info("PreAllocatedBuffer[id={}] has already be used", requireId);
}
}
} catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
index 3eccafee4..351f996c3 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
@@ -18,12 +18,13 @@
package org.apache.uniffle.server.buffer;
public class PreAllocatedBufferInfo {
-
+ private String appId;
private long requireId;
private long timestamp;
private int requireSize;
- public PreAllocatedBufferInfo(long requireId, long timestamp, int
requireSize) {
+ public PreAllocatedBufferInfo(String appId, long requireId, long timestamp,
int requireSize) {
+ this.appId = appId;
this.requireId = requireId;
this.timestamp = timestamp;
this.requireSize = requireSize;
@@ -40,4 +41,8 @@ public class PreAllocatedBufferInfo {
public int getRequireSize() {
return requireSize;
}
+
+ public String getAppId() {
+ return appId;
+ }
}