This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8f93dc2 [SPARK-31259][CORE] Fix log message about fetch request size
in ShuffleBlockFetcherIterator
8f93dc2 is described below
commit 8f93dc2f1dd8bd09d52fd3dc07a4c10e70bd237c
Author: yi.wu <[email protected]>
AuthorDate: Thu Mar 26 09:11:13 2020 -0700
[SPARK-31259][CORE] Fix log message about fetch request size in
ShuffleBlockFetcherIterator
### What changes were proposed in this pull request?
Fix incorrect log of `cureRequestSize`.
### Why are the changes needed?
In batch mode, `curRequestSize` can be the total size of several block
groups. And each group should have its own request size instead of using the
total size.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
It's only affect log.
Closes #28028 from Ngone51/fix_curRequestSize.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 33f532a9f201fb9c7895d685b3dce82cf042dc61)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f1a7d88..404e055 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -329,9 +329,8 @@ final class ShuffleBlockFetcherIterator(
private def createFetchRequest(
blocks: Seq[FetchBlockInfo],
- address: BlockManagerId,
- curRequestSize: Long): FetchRequest = {
- logDebug(s"Creating fetch request of $curRequestSize at $address "
+ address: BlockManagerId): FetchRequest = {
+ logDebug(s"Creating fetch request of ${blocks.map(_.size).sum} at $address
"
+ s"with ${blocks.size} blocks")
FetchRequest(address, blocks)
}
@@ -339,17 +338,16 @@ final class ShuffleBlockFetcherIterator(
private def createFetchRequests(
curBlocks: Seq[FetchBlockInfo],
address: BlockManagerId,
- curRequestSize: Long,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo]
= {
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
var retBlocks = Seq.empty[FetchBlockInfo]
if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
- collectedRemoteRequests += createFetchRequest(mergedBlocks, address,
curRequestSize)
+ collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
} else {
mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
- collectedRemoteRequests += createFetchRequest(blocks, address,
curRequestSize)
+ collectedRemoteRequests += createFetchRequest(blocks, address)
} else {
// The last group does not exceed `maxBlocksInFlightPerAddress`. Put
it back
// to `curBlocks`.
@@ -377,14 +375,14 @@ final class ShuffleBlockFetcherIterator(
// For batch fetch, the actual block in flight should count for merged
block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >=
maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
- curBlocks = createFetchRequests(curBlocks, address, curRequestSize,
isLast = false,
+ curBlocks = createFetchRequests(curBlocks, address, isLast = false,
collectedRemoteRequests).to[ArrayBuffer]
curRequestSize = curBlocks.map(_.size).sum
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
- curBlocks = createFetchRequests(curBlocks, address, curRequestSize,
isLast = true,
+ curBlocks = createFetchRequests(curBlocks, address, isLast = true,
collectedRemoteRequests).to[ArrayBuffer]
curRequestSize = curBlocks.map(_.size).sum
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]