This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8f93dc2 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator 8f93dc2 is described below commit 8f93dc2f1dd8bd09d52fd3dc07a4c10e70bd237c Author: yi.wu <yi...@databricks.com> AuthorDate: Thu Mar 26 09:11:13 2020 -0700 [SPARK-31259][CORE] Fix log message about fetch request size in ShuffleBlockFetcherIterator ### What changes were proposed in this pull request? Fix incorrect log of `cureRequestSize`. ### Why are the changes needed? In batch mode, `curRequestSize` can be the total size of several block groups. And each group should have its own request size instead of using the total size. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? It's only affect log. Closes #28028 from Ngone51/fix_curRequestSize. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 33f532a9f201fb9c7895d685b3dce82cf042dc61) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f1a7d88..404e055 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequest( blocks: Seq[FetchBlockInfo], - address: BlockManagerId, - curRequestSize: Long): FetchRequest = { - logDebug(s"Creating fetch request of $curRequestSize at $address " + address: BlockManagerId): FetchRequest = { + logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address " + s"with ${blocks.size} blocks") FetchRequest(address, blocks) } @@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator( private def createFetchRequests( curBlocks: Seq[FetchBlockInfo], address: BlockManagerId, - curRequestSize: Long, isLast: Boolean, collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = { val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) var retBlocks = Seq.empty[FetchBlockInfo] if (mergedBlocks.length <= maxBlocksInFlightPerAddress) { - collectedRemoteRequests += createFetchRequest(mergedBlocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(mergedBlocks, address) } else { mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks => if (blocks.length == maxBlocksInFlightPerAddress || isLast) { - collectedRemoteRequests += createFetchRequest(blocks, address, curRequestSize) + collectedRemoteRequests += createFetchRequest(blocks, address) } else { // The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back // to `curBlocks`. @@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator( // For batch fetch, the actual block in flight should count for merged block. val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = false, + curBlocks = createFetchRequests(curBlocks, address, isLast = false, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } } // Add in the final request if (curBlocks.nonEmpty) { - curBlocks = createFetchRequests(curBlocks, address, curRequestSize, isLast = true, + curBlocks = createFetchRequests(curBlocks, address, isLast = true, collectedRemoteRequests).to[ArrayBuffer] curRequestSize = curBlocks.map(_.size).sum } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org