This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 415f50511463 [SPARK-54536][CORE] Shuffle FetchWaitTime missing collect
create client/wait cost
415f50511463 is described below
commit 415f50511463a8e73af77cf9f70cba4292c1331d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Dec 3 07:38:54 2025 -0800
[SPARK-54536][CORE] Shuffle FetchWaitTime missing collect create
client/wait cost
### What changes were proposed in this pull request?
When ShuffleBlockFetcherIterator fetch data, two shuffle cost not
calculated.
1. Network resource congestion and waiting between `fetchUpToMaxBytes` and
`fetchAllHostLocalBlocks` ;
2. Connection establishment congestion. When `fetchUpToMaxBytes` and
`fetchAllHostLocalBlocks` send request, create client may be congestion
### Why are the changes needed?
Make shuffle fetch wait time request time more accurate.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
For open block request add a Thread.sleep(3000) latency, shuffle read
metrics like below
<img width="1724" height="829" alt="截屏2025-11-27 17 38 26"
src="https://github.com/user-attachments/assets/99f3822d-d5a7-4f4a-abfc-cc272e61667c"
/>
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53245 from AngersZhuuuu/SPARK-54536.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/storage/ShuffleBlockFetcherIterator.scala | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b2f185bc590f..cc552a2985f7 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -193,6 +193,14 @@ final class ShuffleBlockFetcherIterator(
initialize()
+ private def withFetchWaitTimeTracked[T](f: => T): T = {
+ val startFetchWait = System.nanoTime()
+ val res = f
+ val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startFetchWait)
+ shuffleMetrics.incFetchWaitTime(fetchWaitTime)
+ res
+ }
+
// Decrements the buffer reference count.
// The currentResult is set to null to prevent releasing the buffer again on
cleanup()
private[storage] def releaseCurrentResultBuffer(): Unit = {
@@ -718,7 +726,7 @@ final class ShuffleBlockFetcherIterator(
", expected bytesInFlight = 0 but found bytesInFlight = " +
bytesInFlight)
// Send out initial requests for blocks, up to our maxBytesInFlight
- fetchUpToMaxBytes()
+ withFetchWaitTimeTracked(fetchUpToMaxBytes())
val numDeferredRequest = deferredFetchRequests.values.map(_.size).sum
val numFetches = remoteRequests.size - fetchRequests.size -
numDeferredRequest
@@ -731,7 +739,7 @@ final class ShuffleBlockFetcherIterator(
fetchLocalBlocks(localBlocks)
logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
// Get host local blocks if any
- fetchAllHostLocalBlocks(hostLocalBlocksByExecutor)
+
withFetchWaitTimeTracked(fetchAllHostLocalBlocks(hostLocalBlocksByExecutor))
pushBasedFetchHelper.fetchAllPushMergedLocalBlocks(pushMergedLocalBlocks)
}
@@ -813,10 +821,7 @@ final class ShuffleBlockFetcherIterator(
// is also corrupt, so the previous stage could be retried.
// For local shuffle block, throw FailureFetchResult for the first
IOException.
while (result == null) {
- val startFetchWait = System.nanoTime()
- result = results.take()
- val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startFetchWait)
- shuffleMetrics.incFetchWaitTime(fetchWaitTime)
+ result = withFetchWaitTimeTracked[FetchResult](results.take())
result match {
case SuccessFetchResult(blockId, mapIndex, address, size, buf,
isNetworkReqDone) =>
@@ -1076,7 +1081,7 @@ final class ShuffleBlockFetcherIterator(
}
// Send fetch requests up to maxBytesInFlight
- fetchUpToMaxBytes()
+ withFetchWaitTimeTracked(fetchUpToMaxBytes())
}
currentResult = result.asInstanceOf[SuccessFetchResult]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]