This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 415f50511463 [SPARK-54536][CORE] Shuffle FetchWaitTime missing collect 
create client/wait cost
415f50511463 is described below

commit 415f50511463a8e73af77cf9f70cba4292c1331d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Dec 3 07:38:54 2025 -0800

    [SPARK-54536][CORE] Shuffle FetchWaitTime missing collect create 
client/wait cost
    
    ### What changes were proposed in this pull request?
    When ShuffleBlockFetcherIterator fetch data, two shuffle cost not 
calculated.
    
    1. Network resource congestion and waiting between `fetchUpToMaxBytes` and 
`fetchAllHostLocalBlocks` ;
    2. Connection establishment congestion. When `fetchUpToMaxBytes` and 
`fetchAllHostLocalBlocks` send request, create client may be congestion
    
    ### Why are the changes needed?
    Make shuffle fetch wait time request time more accurate.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    For open block request add a Thread.sleep(3000) latency, shuffle read 
metrics like below
    
    <img width="1724" height="829" alt="截屏2025-11-27 17 38 26" 
src="https://github.com/user-attachments/assets/99f3822d-d5a7-4f4a-abfc-cc272e61667c";
 />
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53245 from AngersZhuuuu/SPARK-54536.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/storage/ShuffleBlockFetcherIterator.scala   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b2f185bc590f..cc552a2985f7 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -193,6 +193,14 @@ final class ShuffleBlockFetcherIterator(
 
   initialize()
 
+  private def withFetchWaitTimeTracked[T](f: => T): T = {
+    val startFetchWait = System.nanoTime()
+    val res = f
+    val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait)
+    shuffleMetrics.incFetchWaitTime(fetchWaitTime)
+    res
+  }
+
   // Decrements the buffer reference count.
   // The currentResult is set to null to prevent releasing the buffer again on 
cleanup()
   private[storage] def releaseCurrentResultBuffer(): Unit = {
@@ -718,7 +726,7 @@ final class ShuffleBlockFetcherIterator(
       ", expected bytesInFlight = 0 but found bytesInFlight = " + 
bytesInFlight)
 
     // Send out initial requests for blocks, up to our maxBytesInFlight
-    fetchUpToMaxBytes()
+    withFetchWaitTimeTracked(fetchUpToMaxBytes())
 
     val numDeferredRequest = deferredFetchRequests.values.map(_.size).sum
     val numFetches = remoteRequests.size - fetchRequests.size - 
numDeferredRequest
@@ -731,7 +739,7 @@ final class ShuffleBlockFetcherIterator(
     fetchLocalBlocks(localBlocks)
     logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
     // Get host local blocks if any
-    fetchAllHostLocalBlocks(hostLocalBlocksByExecutor)
+    
withFetchWaitTimeTracked(fetchAllHostLocalBlocks(hostLocalBlocksByExecutor))
     pushBasedFetchHelper.fetchAllPushMergedLocalBlocks(pushMergedLocalBlocks)
   }
 
@@ -813,10 +821,7 @@ final class ShuffleBlockFetcherIterator(
     // is also corrupt, so the previous stage could be retried.
     // For local shuffle block, throw FailureFetchResult for the first 
IOException.
     while (result == null) {
-      val startFetchWait = System.nanoTime()
-      result = results.take()
-      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait)
-      shuffleMetrics.incFetchWaitTime(fetchWaitTime)
+      result = withFetchWaitTimeTracked[FetchResult](results.take())
 
       result match {
         case SuccessFetchResult(blockId, mapIndex, address, size, buf, 
isNetworkReqDone) =>
@@ -1076,7 +1081,7 @@ final class ShuffleBlockFetcherIterator(
       }
 
       // Send fetch requests up to maxBytesInFlight
-      fetchUpToMaxBytes()
+      withFetchWaitTimeTracked(fetchUpToMaxBytes())
     }
 
     currentResult = result.asInstanceOf[SuccessFetchResult]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to