This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 057dcd227 [#2460] feat(spark)(part-4): Hybrid storage reading 
statistics (#2468)
057dcd227 is described below

commit 057dcd227286cbfc409b6cfc5787e6057190119b
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Wed Apr 30 13:42:32 2025 +0800

    [#2460] feat(spark)(part-4): Hybrid storage reading statistics (#2468)
    
    ### What changes were proposed in this pull request?
    
    1. Add support of hybrid storage reading statistics
    
    
![image](https://github.com/user-attachments/assets/c63c95fc-9e32-4f80-b251-ce1cdc811883)
    
    ### Why are the changes needed?
    
    followup #2460
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
---
 .../spark/shuffle/events/ShuffleReadMetric.java    | 52 ++++++++++++++++++-
 .../shuffle/manager/ShuffleManagerGrpcService.java |  9 +++-
 .../scala/org/apache/spark/UniffleListener.scala   | 16 ++++--
 .../org/apache/spark/UniffleStatusStore.scala      | 10 +++-
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 47 +++++++++++++++++
 .../spark/shuffle/reader/RssShuffleReader.java     |  8 ++-
 .../request/RssReportShuffleReadMetricRequest.java | 59 +++++++++++++++++++++-
 proto/src/main/proto/Rss.proto                     |  9 ++++
 .../handler/impl/ShuffleServerReadCost.java        | 24 +++++++++
 9 files changed, 226 insertions(+), 8 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
index 1cb54c69f..dbbd7d463 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
@@ -18,7 +18,57 @@
 package org.apache.spark.shuffle.events;
 
 public class ShuffleReadMetric extends ShuffleMetric {
-  public ShuffleReadMetric(long durationMillis, long byteSize) {
+  private final long memoryDurationMillis;
+  private final long memoryByteSize;
+
+  private final long localfileDurationMillis;
+  private final long localfileByteSize;
+
+  private final long hadoopDurationMillis;
+  private final long hadoopByteSize;
+
+  public ShuffleReadMetric(
+      long durationMillis,
+      long byteSize,
+      long memoryDurationMillis,
+      long memoryByteSize,
+      long localfileDurationMillis,
+      long localfileByteSize,
+      long hadoopDurationMillis,
+      long hadoopByteSize) {
     super(durationMillis, byteSize);
+
+    this.memoryDurationMillis = memoryDurationMillis;
+    this.memoryByteSize = memoryByteSize;
+
+    this.localfileDurationMillis = localfileDurationMillis;
+    this.localfileByteSize = localfileByteSize;
+
+    this.hadoopDurationMillis = hadoopDurationMillis;
+    this.hadoopByteSize = hadoopByteSize;
+  }
+
+  public long getMemoryDurationMillis() {
+    return memoryDurationMillis;
+  }
+
+  public long getMemoryByteSize() {
+    return memoryByteSize;
+  }
+
+  public long getLocalfileDurationMillis() {
+    return localfileDurationMillis;
+  }
+
+  public long getLocalfileByteSize() {
+    return localfileByteSize;
+  }
+
+  public long getHadoopDurationMillis() {
+    return hadoopDurationMillis;
+  }
+
+  public long getHadoopByteSize() {
+    return hadoopByteSize;
   }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 2ba39eac1..213f1a774 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -757,7 +757,14 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                         Map.Entry::getKey,
                         x ->
                             new ShuffleReadMetric(
-                                x.getValue().getDurationMillis(), 
x.getValue().getByteSize()))));
+                                x.getValue().getDurationMillis(),
+                                x.getValue().getByteSize(),
+                                x.getValue().getMemoryDurationMillis(),
+                                x.getValue().getMemoryByteSize(),
+                                x.getValue().getLocalfileDurationMillis(),
+                                x.getValue().getLocalfileByteSize(),
+                                x.getValue().getHadoopDurationMillis(),
+                                x.getValue().getHadoopByteSize()))));
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleReadMetricResponse reply =
         RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 4d3b19521..8c84593ea 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -105,9 +105,19 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
     val metrics = event.getMetrics
     for (metric <- metrics.asScala) {
       val id = metric._1
-      val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _ 
=> new AggregatedShuffleReadMetric(0, 0))
-      agg_metric.byteSize += metric._2.getByteSize
-      agg_metric.durationMillis += metric._2.getDurationMillis
+      val rmetric = metric._2
+      val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _ 
=> new AggregatedShuffleReadMetric(0, 0, 0, 0, 0, 0, 0, 0))
+      agg_metric.byteSize += rmetric.getByteSize
+      agg_metric.durationMillis += rmetric.getDurationMillis
+
+      agg_metric.memoryByteSize += rmetric.getMemoryByteSize
+      agg_metric.memoryDurationMills += rmetric.getMemoryDurationMillis
+
+      agg_metric.localfileDurationMillis += rmetric.getLocalfileDurationMillis
+      agg_metric.localfileByteSize += rmetric.getLocalfileByteSize
+
+      agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
+      agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
     }
   }
 
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index b7578d25f..021311981 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -113,7 +113,15 @@ class AggregatedShuffleReadMetricsUIData(val metrics: 
ConcurrentHashMap[String,
   @KVIndex
   def id: String = classOf[AggregatedShuffleReadMetricsUIData].getName()
 }
-class AggregatedShuffleReadMetric(durationMillis: Long, byteSize: Long)
+
+class AggregatedShuffleReadMetric(durationMillis: Long,
+                                  byteSize: Long,
+                                  var memoryDurationMills: Long,
+                                  var memoryByteSize: Long,
+                                  var localfileDurationMillis: Long,
+                                  var localfileByteSize: Long,
+                                  var hadoopDurationMillis: Long,
+                                  var hadoopByteSize: Long)
   extends AggregatedShuffleMetric(durationMillis, byteSize)
 
 // task total cpu time
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index cef2c4297..32864da90 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -143,6 +143,40 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
       fixedWidth = true
     )
 
+    // render reading hybrid storage statistics
+    val readMetrics = originReadMetric.metrics
+    val aggregatedByStorage = readMetrics.asScala.values
+      .flatMap { metric =>
+        Seq(
+          ("MEMORY", metric.memoryByteSize, metric.memoryDurationMills),
+          ("LOCALFILE", metric.localfileByteSize, 
metric.localfileDurationMillis),
+          ("HADOOP", metric.hadoopByteSize, metric.hadoopDurationMillis)
+        )
+      }
+      .groupBy(_._1)
+      .mapValues { values =>
+        val totalBytes = values.map(_._2).sum
+        val totalTime = values.map(_._3).sum
+        val speed = if (totalTime != 0) totalBytes.toDouble / totalTime / 1000 
else 0L
+        (totalBytes, totalTime, speed)
+      }
+      .toSeq
+    val readTableUI = UIUtils.listingTable(
+      Seq("Storage Type", "Read Bytes", "Read Time", "Read Speed (MB/sec)"),
+      { row: (String, Long, Long, Double) =>
+        <tr>
+          <td>{row._1}</td>
+          <td>{Utils.bytesToString(row._2)}</td>
+          <td>{UIUtils.formatDuration(row._3)}</td>
+          <td>{roundToTwoDecimals(row._4)}</td>
+        </tr>
+      },
+      aggregatedByStorage.map { case (storageType, (bytes, time, speed)) =>
+        (storageType, bytes, time, speed)
+      },
+      fixedWidth = true
+    )
+
     // render assignment info
     val assignmentInfos = runtimeStatusStore.assignmentInfos
     val assignmentTableUI = UIUtils.listingTable(
@@ -211,6 +245,19 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
           </div>
         </div>
 
+        <div>
+          <span class="collapse-read-throughput-properties collapse-table"
+                onClick="collapseTable('collapse-read-throughput-properties', 
'read-statistics-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Hybrid Storage Read Statistics</a>
+            </h4>
+          </span>
+          <div class="read-statistics-table collapsible-table collapsed">
+            {readTableUI}
+          </div>
+        </div>
+
         <div>
           <span class="collapse-server-properties collapse-table"
                 onClick="collapseTable('collapse-server-properties', 
'all-servers-table')">
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 6c6b3760c..a1c345e84 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -356,7 +356,13 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
                                 x ->
                                     new 
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
                                         x.getValue().getDurationMillis(),
-                                        x.getValue().getReadBytes())))));
+                                        x.getValue().getReadBytes(),
+                                        
x.getValue().getMemoryReadDurationMillis(),
+                                        x.getValue().getMemoryReadBytes(),
+                                        
x.getValue().getLocalfileReadDurationMillis(),
+                                        x.getValue().getLocalfileReadBytes(),
+                                        
x.getValue().getHadoopReadLocalFileDurationMillis(),
+                                        
x.getValue().getHadoopReadLocalFileBytes())))));
         if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
           LOG.error("Errors on reporting shuffle read metrics to driver");
         }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index 17d95d0d2..f1faebe0e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -53,6 +53,13 @@ public class RssReportShuffleReadMetricRequest {
                             RssProtos.ShuffleReadMetric.newBuilder()
                                 .setByteSize(x.getValue().getByteSize())
                                 
.setDurationMillis(x.getValue().getDurationMillis())
+                                
.setMemoryByteSize(x.getValue().getMemoryByteSize())
+                                
.setMemoryDurationMillis(x.getValue().getMemoryDurationMillis())
+                                
.setLocalfileByteSize(x.getValue().getLocalfileByteSize())
+                                .setLocalfileDurationMillis(
+                                    x.getValue().getLocalfileDurationMillis())
+                                
.setHadoopByteSize(x.getValue().getHadoopByteSize())
+                                
.setHadoopDurationMillis(x.getValue().getHadoopDurationMillis())
                                 .build())));
     return builder.build();
   }
@@ -61,9 +68,35 @@ public class RssReportShuffleReadMetricRequest {
     private long durationMillis;
     private long byteSize;
 
-    public TaskShuffleReadMetric(long durationMillis, long byteSize) {
+    private long localfileByteSize;
+    private long localfileDurationMillis;
+
+    private long memoryByteSize;
+    private long memoryDurationMillis;
+
+    private long hadoopByteSize;
+    private long hadoopDurationMillis;
+
+    public TaskShuffleReadMetric(
+        long durationMillis,
+        long byteSize,
+        long memoryReadDurationMillis,
+        long memoryReadBytes,
+        long localfileReadDurationMillis,
+        long localfileReadBytes,
+        long hadoopReadLocalFileDurationMillis,
+        long hadoopReadLocalFileBytes) {
       this.durationMillis = durationMillis;
       this.byteSize = byteSize;
+
+      this.localfileByteSize = localfileReadBytes;
+      this.localfileDurationMillis = localfileReadDurationMillis;
+
+      this.memoryByteSize = memoryReadBytes;
+      this.memoryDurationMillis = memoryReadDurationMillis;
+
+      this.hadoopByteSize = hadoopReadLocalFileBytes;
+      this.hadoopDurationMillis = hadoopReadLocalFileDurationMillis;
     }
 
     public long getDurationMillis() {
@@ -73,5 +106,29 @@ public class RssReportShuffleReadMetricRequest {
     public long getByteSize() {
       return byteSize;
     }
+
+    public long getLocalfileByteSize() {
+      return localfileByteSize;
+    }
+
+    public long getLocalfileDurationMillis() {
+      return localfileDurationMillis;
+    }
+
+    public long getMemoryByteSize() {
+      return memoryByteSize;
+    }
+
+    public long getMemoryDurationMillis() {
+      return memoryDurationMillis;
+    }
+
+    public long getHadoopByteSize() {
+      return hadoopByteSize;
+    }
+
+    public long getHadoopDurationMillis() {
+      return hadoopDurationMillis;
+    }
   }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index e47c51e13..4e5317587 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -604,6 +604,15 @@ message ShuffleWriteMetric {
 message ShuffleReadMetric {
   int64 durationMillis = 1;
   int64 byteSize = 2;
+
+  int64 memoryDurationMillis = 3;
+  int64 memoryByteSize = 4;
+
+  int64 localfileDurationMillis = 5;
+  int64 localfileByteSize = 6;
+
+  int64 hadoopDurationMillis = 7;
+  int64 hadoopByteSize = 8;
 }
 
 message ReportShuffleWriteMetricResponse {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
index e8920108c..2c5e7fc4e 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
@@ -81,4 +81,28 @@ public class ShuffleServerReadCost {
   public long getReadBytes() {
     return readBytes.get();
   }
+
+  public long getMemoryReadBytes() {
+    return memoryReadBytes.get();
+  }
+
+  public long getMemoryReadDurationMillis() {
+    return memoryReadDurationMillis.get();
+  }
+
+  public long getLocalfileReadBytes() {
+    return localfileReadBytes.get();
+  }
+
+  public long getHadoopReadLocalFileBytes() {
+    return hadoopReadLocalFileBytes.get();
+  }
+
+  public long getLocalfileReadDurationMillis() {
+    return localfileReadDurationMillis.get();
+  }
+
+  public long getHadoopReadLocalFileDurationMillis() {
+    return hadoopReadLocalFileDurationMillis.get();
+  }
 }

Reply via email to