This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e72951  [SPARK-36423][SHUFFLE] Randomize order of blocks in a push 
request to improve block merge ratio for push-based shuffle
6e72951 is described below

commit 6e729515fd2bb228afed964b50f0d02329684934
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Fri Aug 6 09:47:42 2021 -0500

    [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to 
improve block merge ratio for push-based shuffle
    
    ### What changes were proposed in this pull request?
    
    On the client side, we are currently randomizing the order of push requests 
before processing each request. In addition we can further randomize the order 
of blocks within each push request before pushing them.
    In our benchmark, this has resulted in a 60%-70% reduction of blocks that 
fail to be merged due to bock collision (the existing block merge ratio is 
already pretty good in general, and this further improves it).
    
    ### Why are the changes needed?
    
    Improve block merge ratio for push-based shuffle
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Straightforward small change, no additional test needed.
    
    Closes #33649 from Victsm/SPARK-36423.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Min Shen <victor....@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala  | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 56f915b..ecaa4f0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -242,10 +242,16 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         handleResult(PushResult(blockId, exception))
       }
     }
+    // In addition to randomizing the order of the push requests, further 
randomize the order
+    // of blocks within the push request to further reduce the likelihood of 
shuffle server side
+    // collision of pushed blocks. This does not increase the cost of reading 
unmerged shuffle
+    // files on the executor side, because we are still reading MB-size chunks 
and only randomize
+    // the in-memory sliced buffers post reading.
+    val (blockPushIds, blockPushBuffers) = Utils.randomize(blockIds.zip(
+      sliceReqBufferIntoBlockBuffers(request.reqBuffer, 
request.blocks.map(_._2)))).unzip
     SparkEnv.get.blockManager.blockStoreClient.pushBlocks(
-      address.host, address.port, blockIds.toArray,
-      sliceReqBufferIntoBlockBuffers(request.reqBuffer, 
request.blocks.map(_._2)),
-      blockPushListener)
+      address.host, address.port, blockPushIds.toArray,
+      blockPushBuffers.toArray, blockPushListener)
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to