This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 057dcd227 [#2460] feat(spark)(part-4): Hybrid storage reading statistics (#2468) 057dcd227 is described below commit 057dcd227286cbfc409b6cfc5787e6057190119b Author: Junfan Zhang <zus...@apache.org> AuthorDate: Wed Apr 30 13:42:32 2025 +0800 [#2460] feat(spark)(part-4): Hybrid storage reading statistics (#2468) ### What changes were proposed in this pull request? 1. Add support of hybrid storage reading statistics  ### Why are the changes needed? followup #2460 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal tests. --- .../spark/shuffle/events/ShuffleReadMetric.java | 52 ++++++++++++++++++- .../shuffle/manager/ShuffleManagerGrpcService.java | 9 +++- .../scala/org/apache/spark/UniffleListener.scala | 16 ++++-- .../org/apache/spark/UniffleStatusStore.scala | 10 +++- .../scala/org/apache/spark/ui/ShufflePage.scala | 47 +++++++++++++++++ .../spark/shuffle/reader/RssShuffleReader.java | 8 ++- .../request/RssReportShuffleReadMetricRequest.java | 59 +++++++++++++++++++++- proto/src/main/proto/Rss.proto | 9 ++++ .../handler/impl/ShuffleServerReadCost.java | 24 +++++++++ 9 files changed, 226 insertions(+), 8 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java index 1cb54c69f..dbbd7d463 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java @@ -18,7 +18,57 @@ package org.apache.spark.shuffle.events; public class ShuffleReadMetric extends ShuffleMetric { - public ShuffleReadMetric(long durationMillis, long byteSize) { + private final long memoryDurationMillis; + private final long memoryByteSize; + + private final long localfileDurationMillis; + private final long localfileByteSize; + + private final long hadoopDurationMillis; + private final long hadoopByteSize; + + public ShuffleReadMetric( + long durationMillis, + long byteSize, + long memoryDurationMillis, + long memoryByteSize, + long localfileDurationMillis, + long localfileByteSize, + long hadoopDurationMillis, + long hadoopByteSize) { super(durationMillis, byteSize); + + this.memoryDurationMillis = memoryDurationMillis; + this.memoryByteSize = memoryByteSize; + + this.localfileDurationMillis = localfileDurationMillis; + this.localfileByteSize = localfileByteSize; + + this.hadoopDurationMillis = hadoopDurationMillis; + this.hadoopByteSize = hadoopByteSize; + } + + public long getMemoryDurationMillis() { + return memoryDurationMillis; + } + + public long getMemoryByteSize() { + return memoryByteSize; + } + + public long getLocalfileDurationMillis() { + return localfileDurationMillis; + } + + public long getLocalfileByteSize() { + return localfileByteSize; + } + + public long getHadoopDurationMillis() { + return hadoopDurationMillis; + } + + public long getHadoopByteSize() { + return hadoopByteSize; } } diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java index 2ba39eac1..213f1a774 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java @@ -757,7 +757,14 @@ public class ShuffleManagerGrpcService extends ShuffleManagerImplBase { Map.Entry::getKey, x -> new ShuffleReadMetric( - x.getValue().getDurationMillis(), x.getValue().getByteSize())))); + x.getValue().getDurationMillis(), + x.getValue().getByteSize(), + x.getValue().getMemoryDurationMillis(), + x.getValue().getMemoryByteSize(), + x.getValue().getLocalfileDurationMillis(), + x.getValue().getLocalfileByteSize(), + x.getValue().getHadoopDurationMillis(), + x.getValue().getHadoopByteSize())))); RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event); RssProtos.ReportShuffleReadMetricResponse reply = RssProtos.ReportShuffleReadMetricResponse.newBuilder() diff --git a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala index 4d3b19521..8c84593ea 100644 --- a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala +++ b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala @@ -105,9 +105,19 @@ class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore) val metrics = event.getMetrics for (metric <- metrics.asScala) { val id = metric._1 - val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _ => new AggregatedShuffleReadMetric(0, 0)) - agg_metric.byteSize += metric._2.getByteSize - agg_metric.durationMillis += metric._2.getDurationMillis + val rmetric = metric._2 + val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _ => new AggregatedShuffleReadMetric(0, 0, 0, 0, 0, 0, 0, 0)) + agg_metric.byteSize += rmetric.getByteSize + agg_metric.durationMillis += rmetric.getDurationMillis + + agg_metric.memoryByteSize += rmetric.getMemoryByteSize + agg_metric.memoryDurationMills += rmetric.getMemoryDurationMillis + + agg_metric.localfileDurationMillis += rmetric.getLocalfileDurationMillis + agg_metric.localfileByteSize += rmetric.getLocalfileByteSize + + agg_metric.hadoopByteSize += rmetric.getHadoopByteSize + agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis } } diff --git a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala index b7578d25f..021311981 100644 --- a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala +++ b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala @@ -113,7 +113,15 @@ class AggregatedShuffleReadMetricsUIData(val metrics: ConcurrentHashMap[String, @KVIndex def id: String = classOf[AggregatedShuffleReadMetricsUIData].getName() } -class AggregatedShuffleReadMetric(durationMillis: Long, byteSize: Long) + +class AggregatedShuffleReadMetric(durationMillis: Long, + byteSize: Long, + var memoryDurationMills: Long, + var memoryByteSize: Long, + var localfileDurationMillis: Long, + var localfileByteSize: Long, + var hadoopDurationMillis: Long, + var hadoopByteSize: Long) extends AggregatedShuffleMetric(durationMillis, byteSize) // task total cpu time diff --git a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala index cef2c4297..32864da90 100644 --- a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala +++ b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala @@ -143,6 +143,40 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging { fixedWidth = true ) + // render reading hybrid storage statistics + val readMetrics = originReadMetric.metrics + val aggregatedByStorage = readMetrics.asScala.values + .flatMap { metric => + Seq( + ("MEMORY", metric.memoryByteSize, metric.memoryDurationMills), + ("LOCALFILE", metric.localfileByteSize, metric.localfileDurationMillis), + ("HADOOP", metric.hadoopByteSize, metric.hadoopDurationMillis) + ) + } + .groupBy(_._1) + .mapValues { values => + val totalBytes = values.map(_._2).sum + val totalTime = values.map(_._3).sum + val speed = if (totalTime != 0) totalBytes.toDouble / totalTime / 1000 else 0L + (totalBytes, totalTime, speed) + } + .toSeq + val readTableUI = UIUtils.listingTable( + Seq("Storage Type", "Read Bytes", "Read Time", "Read Speed (MB/sec)"), + { row: (String, Long, Long, Double) => + <tr> + <td>{row._1}</td> + <td>{Utils.bytesToString(row._2)}</td> + <td>{UIUtils.formatDuration(row._3)}</td> + <td>{roundToTwoDecimals(row._4)}</td> + </tr> + }, + aggregatedByStorage.map { case (storageType, (bytes, time, speed)) => + (storageType, bytes, time, speed) + }, + fixedWidth = true + ) + // render assignment info val assignmentInfos = runtimeStatusStore.assignmentInfos val assignmentTableUI = UIUtils.listingTable( @@ -211,6 +245,19 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging { </div> </div> + <div> + <span class="collapse-read-throughput-properties collapse-table" + onClick="collapseTable('collapse-read-throughput-properties', 'read-statistics-table')"> + <h4> + <span class="collapse-table-arrow arrow-closed"></span> + <a>Hybrid Storage Read Statistics</a> + </h4> + </span> + <div class="read-statistics-table collapsible-table collapsed"> + {readTableUI} + </div> + </div> + <div> <span class="collapse-server-properties collapse-table" onClick="collapseTable('collapse-server-properties', 'all-servers-table')"> diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java index 6c6b3760c..a1c345e84 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java @@ -356,7 +356,13 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> { x -> new RssReportShuffleReadMetricRequest.TaskShuffleReadMetric( x.getValue().getDurationMillis(), - x.getValue().getReadBytes()))))); + x.getValue().getReadBytes(), + x.getValue().getMemoryReadDurationMillis(), + x.getValue().getMemoryReadBytes(), + x.getValue().getLocalfileReadDurationMillis(), + x.getValue().getLocalfileReadBytes(), + x.getValue().getHadoopReadLocalFileDurationMillis(), + x.getValue().getHadoopReadLocalFileBytes()))))); if (response != null && response.getStatusCode() != StatusCode.SUCCESS) { LOG.error("Errors on reporting shuffle read metrics to driver"); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java index 17d95d0d2..f1faebe0e 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java @@ -53,6 +53,13 @@ public class RssReportShuffleReadMetricRequest { RssProtos.ShuffleReadMetric.newBuilder() .setByteSize(x.getValue().getByteSize()) .setDurationMillis(x.getValue().getDurationMillis()) + .setMemoryByteSize(x.getValue().getMemoryByteSize()) + .setMemoryDurationMillis(x.getValue().getMemoryDurationMillis()) + .setLocalfileByteSize(x.getValue().getLocalfileByteSize()) + .setLocalfileDurationMillis( + x.getValue().getLocalfileDurationMillis()) + .setHadoopByteSize(x.getValue().getHadoopByteSize()) + .setHadoopDurationMillis(x.getValue().getHadoopDurationMillis()) .build()))); return builder.build(); } @@ -61,9 +68,35 @@ public class RssReportShuffleReadMetricRequest { private long durationMillis; private long byteSize; - public TaskShuffleReadMetric(long durationMillis, long byteSize) { + private long localfileByteSize; + private long localfileDurationMillis; + + private long memoryByteSize; + private long memoryDurationMillis; + + private long hadoopByteSize; + private long hadoopDurationMillis; + + public TaskShuffleReadMetric( + long durationMillis, + long byteSize, + long memoryReadDurationMillis, + long memoryReadBytes, + long localfileReadDurationMillis, + long localfileReadBytes, + long hadoopReadLocalFileDurationMillis, + long hadoopReadLocalFileBytes) { this.durationMillis = durationMillis; this.byteSize = byteSize; + + this.localfileByteSize = localfileReadBytes; + this.localfileDurationMillis = localfileReadDurationMillis; + + this.memoryByteSize = memoryReadBytes; + this.memoryDurationMillis = memoryReadDurationMillis; + + this.hadoopByteSize = hadoopReadLocalFileBytes; + this.hadoopDurationMillis = hadoopReadLocalFileDurationMillis; } public long getDurationMillis() { @@ -73,5 +106,29 @@ public class RssReportShuffleReadMetricRequest { public long getByteSize() { return byteSize; } + + public long getLocalfileByteSize() { + return localfileByteSize; + } + + public long getLocalfileDurationMillis() { + return localfileDurationMillis; + } + + public long getMemoryByteSize() { + return memoryByteSize; + } + + public long getMemoryDurationMillis() { + return memoryDurationMillis; + } + + public long getHadoopByteSize() { + return hadoopByteSize; + } + + public long getHadoopDurationMillis() { + return hadoopDurationMillis; + } } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index e47c51e13..4e5317587 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -604,6 +604,15 @@ message ShuffleWriteMetric { message ShuffleReadMetric { int64 durationMillis = 1; int64 byteSize = 2; + + int64 memoryDurationMillis = 3; + int64 memoryByteSize = 4; + + int64 localfileDurationMillis = 5; + int64 localfileByteSize = 6; + + int64 hadoopDurationMillis = 7; + int64 hadoopByteSize = 8; } message ReportShuffleWriteMetricResponse { diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java index e8920108c..2c5e7fc4e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java @@ -81,4 +81,28 @@ public class ShuffleServerReadCost { public long getReadBytes() { return readBytes.get(); } + + public long getMemoryReadBytes() { + return memoryReadBytes.get(); + } + + public long getMemoryReadDurationMillis() { + return memoryReadDurationMillis.get(); + } + + public long getLocalfileReadBytes() { + return localfileReadBytes.get(); + } + + public long getHadoopReadLocalFileBytes() { + return hadoopReadLocalFileBytes.get(); + } + + public long getLocalfileReadDurationMillis() { + return localfileReadDurationMillis.get(); + } + + public long getHadoopReadLocalFileDurationMillis() { + return hadoopReadLocalFileDurationMillis.get(); + } }