This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6e72951 [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle 6e72951 is described below commit 6e729515fd2bb228afed964b50f0d02329684934 Author: Min Shen <ms...@linkedin.com> AuthorDate: Fri Aug 6 09:47:42 2021 -0500 [SPARK-36423][SHUFFLE] Randomize order of blocks in a push request to improve block merge ratio for push-based shuffle ### What changes were proposed in this pull request? On the client side, we are currently randomizing the order of push requests before processing each request. In addition we can further randomize the order of blocks within each push request before pushing them. In our benchmark, this has resulted in a 60%-70% reduction of blocks that fail to be merged due to bock collision (the existing block merge ratio is already pretty good in general, and this further improves it). ### Why are the changes needed? Improve block merge ratio for push-based shuffle ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Straightforward small change, no additional test needed. Closes #33649 from Victsm/SPARK-36423. Lead-authored-by: Min Shen <ms...@linkedin.com> Co-authored-by: Min Shen <victor....@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala index 56f915b..ecaa4f0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala @@ -242,10 +242,16 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { handleResult(PushResult(blockId, exception)) } } + // In addition to randomizing the order of the push requests, further randomize the order + // of blocks within the push request to further reduce the likelihood of shuffle server side + // collision of pushed blocks. This does not increase the cost of reading unmerged shuffle + // files on the executor side, because we are still reading MB-size chunks and only randomize + // the in-memory sliced buffers post reading. + val (blockPushIds, blockPushBuffers) = Utils.randomize(blockIds.zip( + sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)))).unzip SparkEnv.get.blockManager.blockStoreClient.pushBlocks( - address.host, address.port, blockIds.toArray, - sliceReqBufferIntoBlockBuffers(request.reqBuffer, request.blocks.map(_._2)), - blockPushListener) + address.host, address.port, blockPushIds.toArray, + blockPushBuffers.toArray, blockPushListener) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org