This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ba81c2997 [#1645] feat(server): Add gauge metrics for reading 
localfile data (#1646)
ba81c2997 is described below

commit ba81c2997c4097e9b191ba0031bfe12f3b5e9c5a
Author: RickyMa <[email protected]>
AuthorDate: Tue Apr 16 17:48:40 2024 +0800

    [#1645] feat(server): Add gauge metrics for reading localfile data (#1646)
    
    ### What changes were proposed in this pull request?
    
    Add two metrics for each reading memory/index/local files:
    1. Add a gauge metric for its threads number.
    2. Add a gauge metric for its reading size.
    
    ### Why are the changes needed?
    
    Fix https://github.com/apache/incubator-uniffle/issues/1645.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../uniffle/server/ShuffleServerGrpcService.java   | 21 ++++++++++++--
 .../uniffle/server/ShuffleServerMetrics.java       | 24 ++++++++++++++++
 .../server/netty/ShuffleServerNettyHandler.java    | 32 +++++++++++++++++++---
 3 files changed, 70 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 9f8f79eb5..47a0e807e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -693,6 +693,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
         shuffleServer
             .getGrpcMetrics()
             
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
@@ -718,6 +720,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       } finally {
         if (sdr != null) {
           sdr.release();
+          ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+          ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(length);
         }
         shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
@@ -769,18 +773,23 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemory(assumedFileSize)) {
       ShuffleIndexResult shuffleIndexResult = null;
       try {
-        long start = System.currentTimeMillis();
+        final long start = System.currentTimeMillis();
         shuffleIndexResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getShuffleIndex(appId, shuffleId, partitionId, 
partitionNumPerRange, partitionNum);
-        long readTime = System.currentTimeMillis() - start;
 
         ByteBuffer data = shuffleIndexResult.getIndexData();
         ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
         
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+        
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
         GetLocalShuffleIndexResponse.Builder builder =
             
GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
+        long readTime = System.currentTimeMillis() - start;
+        shuffleServer
+            .getGrpcMetrics()
+            
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD, readTime);
         LOG.info(
             "Successfully getShuffleIndex cost {} ms for {}" + " bytes with 
{}",
             readTime,
@@ -808,6 +817,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       } finally {
         if (shuffleIndexResult != null) {
           shuffleIndexResult.release();
+          ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+          
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(assumedFileSize);
         }
         
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
       }
@@ -845,7 +856,6 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, 
transportTime);
       }
     }
-    long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse reply;
@@ -856,6 +866,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
       ShuffleDataResult shuffleDataResult = null;
       try {
+        final long start = System.currentTimeMillis();
         Roaring64NavigableMap expectedTaskIds = null;
         if (request.getSerializedExpectedTaskIdsBitmap() != null
             && !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
@@ -875,6 +886,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           bufferSegments = shuffleDataResult.getBufferSegments();
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+          
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
         }
         long costTime = System.currentTimeMillis() - start;
         shuffleServer
@@ -911,6 +924,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       } finally {
         if (shuffleDataResult != null) {
           shuffleDataResult.release();
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+          
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
         }
         
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
       }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f1f37369e..b4566b366 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -54,6 +54,13 @@ public class ShuffleServerMetrics {
       "localfile_flush_thread_pool_queue_size";
   private static final String FALLBACK_FLUSH_THREAD_POOL_QUEUE_SIZE =
       "fallback_flush_thread_pool_queue_size";
+  private static final String READ_LOCAL_DATA_FILE_THREAD_NUM = 
"read_local_data_file_thread_num";
+  private static final String READ_LOCAL_INDEX_FILE_THREAD_NUM = 
"read_local_index_file_thread_num";
+  private static final String READ_MEMORY_DATA_THREAD_NUM = 
"read_memory_data_thread_num";
+  private static final String READ_LOCAL_DATA_FILE_BUFFER_SIZE = 
"read_local_data_file_buffer_size";
+  private static final String READ_LOCAL_INDEX_FILE_BUFFER_SIZE =
+      "read_local_index_file_buffer_size";
+  private static final String READ_MEMORY_DATA_BUFFER_SIZE = 
"read_memory_data_buffer_size";
   private static final String TOTAL_READ_DATA = "total_read_data";
   private static final String TOTAL_READ_LOCAL_DATA_FILE = 
"total_read_local_data_file";
   private static final String TOTAL_READ_LOCAL_INDEX_FILE = 
"total_read_local_index_file";
@@ -196,6 +203,12 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeFallbackFlushThreadPoolQueueSize;
   public static Gauge.Child gaugeAppNum;
   public static Gauge.Child gaugeTotalPartitionNum;
+  public static Gauge.Child gaugeReadLocalDataFileThreadNum;
+  public static Gauge.Child gaugeReadLocalIndexFileThreadNum;
+  public static Gauge.Child gaugeReadMemoryDataThreadNum;
+  public static Gauge.Child gaugeReadLocalDataFileBufferSize;
+  public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
+  public static Gauge.Child gaugeReadMemoryDataBufferSize;
 
   public static Gauge gaugeTotalDataSizeUsage;
   public static Gauge gaugeInMemoryDataSizeUsage;
@@ -401,6 +414,17 @@ public class ShuffleServerMetrics {
     gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
     gaugeTotalPartitionNum = 
metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
 
+    gaugeReadLocalDataFileThreadNum =
+        metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_THREAD_NUM);
+    gaugeReadLocalIndexFileThreadNum =
+        metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_THREAD_NUM);
+    gaugeReadMemoryDataThreadNum = 
metricsManager.addLabeledGauge(READ_MEMORY_DATA_THREAD_NUM);
+    gaugeReadLocalDataFileBufferSize =
+        metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_BUFFER_SIZE);
+    gaugeReadLocalIndexFileBufferSize =
+        metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_BUFFER_SIZE);
+    gaugeReadMemoryDataBufferSize = 
metricsManager.addLabeledGauge(READ_MEMORY_DATA_BUFFER_SIZE);
+
     gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
     gaugeAppWithHugePartitionNum = 
metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ed83c0b77..dbda25abc 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -255,7 +256,6 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), 
transportTime);
       }
     }
-    final long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse response;
@@ -266,6 +266,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     if 
(shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
       ShuffleDataResult shuffleDataResult = null;
       try {
+        final long start = System.currentTimeMillis();
         shuffleDataResult =
             shuffleServer
                 .getShuffleTaskManager()
@@ -283,12 +284,14 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           bufferSegments = shuffleDataResult.getBufferSegments();
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+          
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
         }
         response =
             new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, 
bufferSegments, data);
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, readBufferSize, data.size(), requestInfo, req, client);
+                start, readBufferSize, data.size(), requestInfo, req, 
response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (Exception e) {
@@ -359,12 +362,14 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         ManagedBuffer data = shuffleIndexResult.getManagedBuffer();
         ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
         
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.size());
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+        
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, data, 
shuffleIndexResult.getDataFileLen());
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, assumedFileSize, data.size(), requestInfo, req, client);
+                start, assumedFileSize, data.size(), requestInfo, req, 
response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (FileNotFoundException indexFileNotFoundException) {
@@ -465,12 +470,14 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     length);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, sdr.getManagedBuffer());
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, length, sdr.getDataLength(), requestInfo, req, client);
+                start, length, sdr.getDataLength(), requestInfo, req, 
response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (Exception e) {
@@ -531,6 +538,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     private final long dataSize;
     private final String requestInfo;
     private final RequestMessage request;
+    private final RpcResponse response;
     private final TransportClient client;
 
     ReleaseMemoryAndRecordReadTimeListener(
@@ -539,12 +547,14 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         long dataSize,
         String requestInfo,
         RequestMessage request,
+        RpcResponse response,
         TransportClient client) {
       this.readStartedTime = readStartedTime;
       this.readBufferSize = readBufferSize;
       this.dataSize = dataSize;
       this.requestInfo = requestInfo;
       this.request = request;
+      this.response = response;
       this.client = client;
     }
 
@@ -554,6 +564,20 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       long readTime = System.currentTimeMillis() - readStartedTime;
       ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
       
shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), 
readTime);
+      if (request instanceof GetLocalShuffleDataRequest) {
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+        
ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(readBufferSize);
+      } else if (request instanceof GetLocalShuffleIndexRequest) {
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+        
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(readBufferSize);
+      } else if (request instanceof GetMemoryShuffleDataRequest) {
+        GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
+            (GetMemoryShuffleDataResponse) response;
+        if 
(CollectionUtils.isNotEmpty(getMemoryShuffleDataResponse.getBufferSegments())) {
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+          
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
+        }
+      }
       if (!future.isSuccess()) {
         Throwable cause = future.cause();
         String errorMsg =

Reply via email to