This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f34898c [SPARK-31034][CORE] ShuffleBlockFetcherIterator should always
create request for last block group
f34898c is described below
commit f34898c5e19c9a35c091eded9652cd5e3d661d19
Author: yi.wu <[email protected]>
AuthorDate: Thu Mar 5 21:31:26 2020 +0800
[SPARK-31034][CORE] ShuffleBlockFetcherIterator should always create
request for last block group
### What changes were proposed in this pull request?
This is a bug fix of #27280. This PR fix the bug where
`ShuffleBlockFetcherIterator` may forget to create request for the last block
group.
### Why are the changes needed?
When (all blocks).sum < `targetRemoteRequestSize` and (all blocks).length >
`maxBlocksInFlightPerAddress` and (last block group).size <
`maxBlocksInFlightPerAddress`,
`ShuffleBlockFetcherIterator` will not create a request for the last group.
Thus, it will lost data for the reduce task.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated test.
Closes #27786 from Ngone51/fix_no_request_bug.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2257ce24437f05c417821c02e3e44c77c93f7211)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../storage/ShuffleBlockFetcherIterator.scala | 8 +-
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 91 ++++++++++++++++++----
2 files changed, 78 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index cd4c860..2a0447d 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -339,14 +339,14 @@ final class ShuffleBlockFetcherIterator(
+ s"with ${blocks.size} blocks")
}
- def createFetchRequests(): Unit = {
+ def createFetchRequests(isLast: Boolean): Unit = {
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
curBlocks = new ArrayBuffer[FetchBlockInfo]
if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
createFetchRequest(mergedBlocks)
} else {
mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { blocks =>
- if (blocks.length == maxBlocksInFlightPerAddress) {
+ if (blocks.length == maxBlocksInFlightPerAddress || isLast) {
createFetchRequest(blocks)
} else {
// The last group does not exceed `maxBlocksInFlightPerAddress`.
Put it back
@@ -367,12 +367,12 @@ final class ShuffleBlockFetcherIterator(
// For batch fetch, the actual block in flight should count for merged
block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >=
maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
- createFetchRequests()
+ createFetchRequests(isLast = false)
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
- createFetchRequests()
+ createFetchRequests(isLast = true)
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 2090a51..773629c 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -433,32 +433,86 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs().size
=== 1)
}
- test("fetch continuous blocks in batch respects maxSize and maxBlocks") {
+ test("fetch continuous blocks in batch should respect maxBytesInFlight") {
val blockManager = mock(classOf[BlockManager])
val localBmId = BlockManagerId("test-client", "test-local-host", 1)
doReturn(localBmId).when(blockManager).blockManagerId
// Make sure remote blocks would return the merged block
- val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
- val remoteBlocks = Seq[BlockId](
+ val remoteBmId1 = BlockManagerId("test-client-1", "test-client-1", 1)
+ val remoteBmId2 = BlockManagerId("test-client-2", "test-client-2", 2)
+ val remoteBlocks1 = (0 until 15).map(ShuffleBlockId(0, 3, _))
+ val remoteBlocks2 = Seq[BlockId](ShuffleBlockId(0, 4, 0),
ShuffleBlockId(0, 4, 1))
+ val mergedRemoteBlocks = Map[BlockId, ManagedBuffer](
+ ShuffleBlockBatchId(0, 3, 0, 3) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 3, 3, 6) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 3, 6, 9) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 3, 9, 12) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 3, 12, 15) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 4, 0, 2) -> createMockManagedBuffer())
+ val transfer = createMockTransfer(mergedRemoteBlocks)
+
+ val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
+ (remoteBmId1, remoteBlocks1.map(blockId => (blockId, 100L, 1))),
+ (remoteBmId2, remoteBlocks2.map(blockId => (blockId, 100L,
1)))).toIterator
+
+ val taskContext = TaskContext.empty()
+ val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+ val iterator = new ShuffleBlockFetcherIterator(
+ taskContext,
+ transfer,
+ blockManager,
+ blocksByAddress,
+ (_, in) => in,
+ 1500,
+ Int.MaxValue,
+ Int.MaxValue,
+ Int.MaxValue,
+ true,
+ false,
+ metrics,
+ true)
+
+ var numResults = 0
+ // After initialize(), there will be 6 FetchRequests. And each of the
first 5 requests
+ // includes 1 merged block which is merged from 3 shuffle blocks. The last
request has 1 merged
+ // block which merged from 2 shuffle blocks. So, only the first 5
requests(5 * 3 * 100 >= 1500)
+ // can be sent. The 6th FetchRequest will hit maxBlocksInFlightPerAddress
so it won't
+ // be sent.
+ verify(transfer, times(5)).fetchBlocks(any(), any(), any(), any(), any(),
any())
+ while (iterator.hasNext) {
+ val (blockId, inputStream) = iterator.next()
+ // Make sure we release buffers when a wrapped input stream is closed.
+ val mockBuf = mergedRemoteBlocks(blockId)
+ verifyBufferRelease(mockBuf, inputStream)
+ numResults += 1
+ }
+ // The 6th request will be sent after next() is called.
+ verify(transfer, times(6)).fetchBlocks(any(), any(), any(), any(), any(),
any())
+ assert(numResults == 6)
+ }
+
+ test("fetch continuous blocks in batch should respect
maxBlocksInFlightPerAddress") {
+ val blockManager = mock(classOf[BlockManager])
+ val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+ doReturn(localBmId).when(blockManager).blockManagerId
+
+ // Make sure remote blocks would return the merged block
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 1)
+ val remoteBlocks = Seq(
ShuffleBlockId(0, 3, 0),
ShuffleBlockId(0, 3, 1),
- ShuffleBlockId(0, 3, 2),
ShuffleBlockId(0, 4, 0),
ShuffleBlockId(0, 4, 1),
- ShuffleBlockId(0, 5, 0),
- ShuffleBlockId(0, 5, 1),
- ShuffleBlockId(0, 5, 2))
+ ShuffleBlockId(0, 5, 0))
val mergedRemoteBlocks = Map[BlockId, ManagedBuffer](
- ShuffleBlockBatchId(0, 3, 0, 3) -> createMockManagedBuffer(),
+ ShuffleBlockBatchId(0, 3, 0, 2) -> createMockManagedBuffer(),
ShuffleBlockBatchId(0, 4, 0, 2) -> createMockManagedBuffer(),
- ShuffleBlockBatchId(0, 5, 0, 3) -> createMockManagedBuffer())
- val transfer = createMockTransfer(mergedRemoteBlocks)
+ ShuffleBlockBatchId(0, 5, 0, 1) -> createMockManagedBuffer())
+ val transfer = createMockTransfer(mergedRemoteBlocks)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
- (remoteBmId, remoteBlocks.map(blockId => (blockId, 1L, 1)))
- ).toIterator
-
+ (remoteBmId, remoteBlocks.map(blockId => (blockId, 100L, 1)))).toIterator
val taskContext = TaskContext.empty()
val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
val iterator = new ShuffleBlockFetcherIterator(
@@ -467,7 +521,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
blockManager,
blocksByAddress,
(_, in) => in,
- 35,
+ Int.MaxValue,
Int.MaxValue,
2,
Int.MaxValue,
@@ -475,8 +529,12 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
false,
metrics,
true)
-
var numResults = 0
+ // After initialize(), there will be 2 FetchRequests. First one has 2
merged blocks and each
+ // of them is merged from 2 shuffle blocks, second one has 1 merged block
which is merged from
+ // 1 shuffle block. So only the first FetchRequest can be sent. The second
FetchRequest will
+ // hit maxBlocksInFlightPerAddress so it won't be sent.
+ verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(),
any())
while (iterator.hasNext) {
val (blockId, inputStream) = iterator.next()
// Make sure we release buffers when a wrapped input stream is closed.
@@ -484,8 +542,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verifyBufferRelease(mockBuf, inputStream)
numResults += 1
}
- // The first 2 batch block ids are in the same fetch request as they don't
exceed the max size
- // and max blocks, so 2 requests in total.
+ // The second request will be sent after next() is called.
verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(),
any())
assert(numResults == 3)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]