This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1642c4dce [#2494] feat(spark): Involve background overlapping 
decompress time in spark UI (#2639)
1642c4dce is described below

commit 1642c4dce78c8337a10b208a8389a090a906998b
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Oct 14 13:48:07 2025 +0800

    [#2494] feat(spark): Involve background overlapping decompress time in 
spark UI (#2639)
    
    ### What changes were proposed in this pull request?
    
    To show the background overlapping decompress time in Spark UI
    
    ### Why are the changes needed?
    
    More easiler to observe the performance improvement ratio
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal job tests.
---
 .../apache/spark/shuffle/reader/RssShuffleDataIterator.java | 13 +++++++++----
 .../src/main/scala/org/apache/spark/ui/ShufflePage.scala    |  9 ++++++---
 .../org/apache/uniffle/client/impl/DecompressionWorker.java |  4 ++++
 .../apache/uniffle/client/impl/ShuffleReadClientImpl.java   |  7 ++++++-
 .../java/org/apache/uniffle/common/ShuffleReadTimes.java    | 11 ++++++++++-
 proto/src/main/proto/Rss.proto                              |  1 +
 6 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 2d462e014..3fdb3bb47 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -126,11 +126,15 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
     if (recordsIterator == null || !recordsIterator.hasNext()) {
       // read next segment
       long startFetch = System.currentTimeMillis();
-      // depends on spark.shuffle.compress, shuffled block may not be 
compressed
       ShuffleBlock shuffleBlock = shuffleReadClient.readShuffleBlockData();
+      long fetchDuration = System.currentTimeMillis() - startFetch;
+
+      // get the buffer, if the block is the DecompressedShuffleBlock,
+      // the duration is the block wait decompression time.
+      long getBuffer = System.currentTimeMillis();
       ByteBuffer rawData = shuffleBlock != null ? shuffleBlock.getByteBuffer() 
: null;
+      long getBufferDuration = System.currentTimeMillis() - getBuffer;
 
-      long readDuration = System.currentTimeMillis() - startFetch;
       if (rawData != null) {
         // collect metrics from raw data
         long rawDataLength = rawData.limit() - rawData.position();
@@ -148,14 +152,15 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
           unCompressedBytesLength += shuffleBlock.getUncompressLength();
         }
         long uncompressionDuration = System.currentTimeMillis() - 
startUncompression;
+        uncompressionDuration += getBufferDuration;
 
         // create new iterator for shuffle data
         long startSerialization = System.currentTimeMillis();
         recordsIterator = createKVIterator(decompressed);
         long serializationDuration = System.currentTimeMillis() - 
startSerialization;
         shuffleReadMetrics.incFetchWaitTime(
-            serializationDuration + uncompressionDuration + readDuration);
-        readTime += readDuration;
+            serializationDuration + uncompressionDuration + fetchDuration);
+        readTime += fetchDuration;
         serializeTime += serializationDuration;
       } else {
         // finish reading records, check data consistent
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 8916916a3..24b8f5be2 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -47,6 +47,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     <td>{kv(3)}</td>
     <td>{kv(4)}</td>
     <td>{kv(5)}</td>
+    <td>{kv(6)}</td>
   </tr>
 
   private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
@@ -159,7 +160,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     val readTimes = runtimeStatusStore.shuffleReadTimes().times
     val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
     val readTimesUI = UIUtils.listingTable(
-      Seq("Total", "Fetch", "Copy", "CRC", "Decompress", "Deserialize"),
+      Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress", 
"Background Decompress"),
       shuffleReadTimesRow,
       Seq(
         Seq(
@@ -167,16 +168,18 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
           UIUtils.formatDuration(readTimes.getFetch),
           UIUtils.formatDuration(readTimes.getCopy),
           UIUtils.formatDuration(readTimes.getCrc),
-          UIUtils.formatDuration(readTimes.getDecompress),
           UIUtils.formatDuration(readTimes.getDeserialize),
+          UIUtils.formatDuration(readTimes.getDecompress),
+          UIUtils.formatDuration(readTimes.getBackgroundDecompress),
         ),
         Seq(
           1,
           readTimes.getFetch.toDouble / readTotal,
           readTimes.getCopy.toDouble / readTotal,
           readTimes.getCrc.toDouble / readTotal,
-          readTimes.getDecompress.toDouble / readTotal,
           readTimes.getDeserialize.toDouble / readTotal,
+          readTimes.getDecompress.toDouble / readTotal,
+          readTimes.getBackgroundDecompress.toDouble / readTotal,
         ).map(x => roundToTwoDecimals(x).toString)
       ),
       fixedWidth = true
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index c57f5519c..41d29511f 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -127,4 +127,8 @@ public class DecompressionWorker {
         wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
     executorService.shutdown();
   }
+
+  public long decompressionMillis() {
+    return decompressionMillis.get() + 
decompressionBufferAllocationMillis.get();
+  }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index dcde0ca3e..d61283742 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -396,6 +396,11 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
 
   @Override
   public ShuffleReadTimes getShuffleReadTimes() {
-    return new ShuffleReadTimes(readDataTime.get(), copyTime.get(), 
crcCheckTime.get());
+    long backgroundDecompressionTime = 0;
+    if (decompressionWorker != null) {
+      backgroundDecompressionTime = decompressionWorker.decompressionMillis();
+    }
+    return new ShuffleReadTimes(
+        readDataTime.get(), copyTime.get(), crcCheckTime.get(), 
backgroundDecompressionTime);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
index dcc3cf539..1c9523355 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
@@ -26,13 +26,15 @@ public class ShuffleReadTimes {
   private long copy;
   private long deserialize;
   private long decompress;
+  private long backgroundDecompress;
 
   public ShuffleReadTimes() {}
 
-  public ShuffleReadTimes(long fetch, long crc, long copy) {
+  public ShuffleReadTimes(long fetch, long crc, long copy, long 
backgroundDecompress) {
     this.fetch = fetch;
     this.crc = crc;
     this.copy = copy;
+    this.backgroundDecompress = backgroundDecompress;
   }
 
   public long getFetch() {
@@ -63,6 +65,10 @@ public class ShuffleReadTimes {
     return decompress;
   }
 
+  public long getBackgroundDecompress() {
+    return backgroundDecompress;
+  }
+
   public void merge(ShuffleReadTimes other) {
     if (other == null) {
       return;
@@ -72,6 +78,7 @@ public class ShuffleReadTimes {
     this.copy += other.copy;
     this.deserialize += other.deserialize;
     this.decompress += other.decompress;
+    this.backgroundDecompress += other.backgroundDecompress;
   }
 
   public long getTotal() {
@@ -85,6 +92,7 @@ public class ShuffleReadTimes {
         .setCopy(copy)
         .setDecompress(decompress)
         .setDeserialize(deserialize)
+        .setBackgroundDecompress(backgroundDecompress)
         .build();
   }
 
@@ -95,6 +103,7 @@ public class ShuffleReadTimes {
     time.copy = proto.getCopy();
     time.decompress = proto.getDecompress();
     time.deserialize = proto.getDeserialize();
+    time.backgroundDecompress = proto.getBackgroundDecompress();
     return time;
   }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index c3db61e39..0950c5e4c 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -656,6 +656,7 @@ message ShuffleReadTimes {
   int64 copy = 3;
   int64 deserialize = 4;
   int64 decompress = 5;
+  int64 backgroundDecompress = 6;
 }
 
 message ReportShuffleReadMetricResponse {

Reply via email to