This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1642c4dce [#2494] feat(spark): Involve background overlapping
decompress time in spark UI (#2639)
1642c4dce is described below
commit 1642c4dce78c8337a10b208a8389a090a906998b
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Oct 14 13:48:07 2025 +0800
[#2494] feat(spark): Involve background overlapping decompress time in
spark UI (#2639)
### What changes were proposed in this pull request?
To show the background overlapping decompress time in Spark UI
### Why are the changes needed?
More easiler to observe the performance improvement ratio
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal job tests.
---
.../apache/spark/shuffle/reader/RssShuffleDataIterator.java | 13 +++++++++----
.../src/main/scala/org/apache/spark/ui/ShufflePage.scala | 9 ++++++---
.../org/apache/uniffle/client/impl/DecompressionWorker.java | 4 ++++
.../apache/uniffle/client/impl/ShuffleReadClientImpl.java | 7 ++++++-
.../java/org/apache/uniffle/common/ShuffleReadTimes.java | 11 ++++++++++-
proto/src/main/proto/Rss.proto | 1 +
6 files changed, 36 insertions(+), 9 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 2d462e014..3fdb3bb47 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -126,11 +126,15 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
if (recordsIterator == null || !recordsIterator.hasNext()) {
// read next segment
long startFetch = System.currentTimeMillis();
- // depends on spark.shuffle.compress, shuffled block may not be
compressed
ShuffleBlock shuffleBlock = shuffleReadClient.readShuffleBlockData();
+ long fetchDuration = System.currentTimeMillis() - startFetch;
+
+ // get the buffer, if the block is the DecompressedShuffleBlock,
+ // the duration is the block wait decompression time.
+ long getBuffer = System.currentTimeMillis();
ByteBuffer rawData = shuffleBlock != null ? shuffleBlock.getByteBuffer()
: null;
+ long getBufferDuration = System.currentTimeMillis() - getBuffer;
- long readDuration = System.currentTimeMillis() - startFetch;
if (rawData != null) {
// collect metrics from raw data
long rawDataLength = rawData.limit() - rawData.position();
@@ -148,14 +152,15 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
unCompressedBytesLength += shuffleBlock.getUncompressLength();
}
long uncompressionDuration = System.currentTimeMillis() -
startUncompression;
+ uncompressionDuration += getBufferDuration;
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(decompressed);
long serializationDuration = System.currentTimeMillis() -
startSerialization;
shuffleReadMetrics.incFetchWaitTime(
- serializationDuration + uncompressionDuration + readDuration);
- readTime += readDuration;
+ serializationDuration + uncompressionDuration + fetchDuration);
+ readTime += fetchDuration;
serializeTime += serializationDuration;
} else {
// finish reading records, check data consistent
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 8916916a3..24b8f5be2 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -47,6 +47,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
<td>{kv(3)}</td>
<td>{kv(4)}</td>
<td>{kv(5)}</td>
+ <td>{kv(6)}</td>
</tr>
private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
@@ -159,7 +160,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
val readTimes = runtimeStatusStore.shuffleReadTimes().times
val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
val readTimesUI = UIUtils.listingTable(
- Seq("Total", "Fetch", "Copy", "CRC", "Decompress", "Deserialize"),
+ Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress",
"Background Decompress"),
shuffleReadTimesRow,
Seq(
Seq(
@@ -167,16 +168,18 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
UIUtils.formatDuration(readTimes.getFetch),
UIUtils.formatDuration(readTimes.getCopy),
UIUtils.formatDuration(readTimes.getCrc),
- UIUtils.formatDuration(readTimes.getDecompress),
UIUtils.formatDuration(readTimes.getDeserialize),
+ UIUtils.formatDuration(readTimes.getDecompress),
+ UIUtils.formatDuration(readTimes.getBackgroundDecompress),
),
Seq(
1,
readTimes.getFetch.toDouble / readTotal,
readTimes.getCopy.toDouble / readTotal,
readTimes.getCrc.toDouble / readTotal,
- readTimes.getDecompress.toDouble / readTotal,
readTimes.getDeserialize.toDouble / readTotal,
+ readTimes.getDecompress.toDouble / readTotal,
+ readTimes.getBackgroundDecompress.toDouble / readTotal,
).map(x => roundToTwoDecimals(x).toString)
),
fixedWidth = true
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index c57f5519c..41d29511f 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -127,4 +127,8 @@ public class DecompressionWorker {
wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
executorService.shutdown();
}
+
+ public long decompressionMillis() {
+ return decompressionMillis.get() +
decompressionBufferAllocationMillis.get();
+ }
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index dcde0ca3e..d61283742 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -396,6 +396,11 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
@Override
public ShuffleReadTimes getShuffleReadTimes() {
- return new ShuffleReadTimes(readDataTime.get(), copyTime.get(),
crcCheckTime.get());
+ long backgroundDecompressionTime = 0;
+ if (decompressionWorker != null) {
+ backgroundDecompressionTime = decompressionWorker.decompressionMillis();
+ }
+ return new ShuffleReadTimes(
+ readDataTime.get(), copyTime.get(), crcCheckTime.get(),
backgroundDecompressionTime);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
index dcc3cf539..1c9523355 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
@@ -26,13 +26,15 @@ public class ShuffleReadTimes {
private long copy;
private long deserialize;
private long decompress;
+ private long backgroundDecompress;
public ShuffleReadTimes() {}
- public ShuffleReadTimes(long fetch, long crc, long copy) {
+ public ShuffleReadTimes(long fetch, long crc, long copy, long
backgroundDecompress) {
this.fetch = fetch;
this.crc = crc;
this.copy = copy;
+ this.backgroundDecompress = backgroundDecompress;
}
public long getFetch() {
@@ -63,6 +65,10 @@ public class ShuffleReadTimes {
return decompress;
}
+ public long getBackgroundDecompress() {
+ return backgroundDecompress;
+ }
+
public void merge(ShuffleReadTimes other) {
if (other == null) {
return;
@@ -72,6 +78,7 @@ public class ShuffleReadTimes {
this.copy += other.copy;
this.deserialize += other.deserialize;
this.decompress += other.decompress;
+ this.backgroundDecompress += other.backgroundDecompress;
}
public long getTotal() {
@@ -85,6 +92,7 @@ public class ShuffleReadTimes {
.setCopy(copy)
.setDecompress(decompress)
.setDeserialize(deserialize)
+ .setBackgroundDecompress(backgroundDecompress)
.build();
}
@@ -95,6 +103,7 @@ public class ShuffleReadTimes {
time.copy = proto.getCopy();
time.decompress = proto.getDecompress();
time.deserialize = proto.getDeserialize();
+ time.backgroundDecompress = proto.getBackgroundDecompress();
return time;
}
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index c3db61e39..0950c5e4c 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -656,6 +656,7 @@ message ShuffleReadTimes {
int64 copy = 3;
int64 deserialize = 4;
int64 decompress = 5;
+ int64 backgroundDecompress = 6;
}
message ReportShuffleReadMetricResponse {