Repository: spark Updated Branches: refs/heads/master 0985d2c30 -> f6c447f87
Removed code duplication in ShuffleBlockFetcherIterator Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made. Author: Evan Racah <ejra...@gmail.com> Closes #8514 from eracah/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c447f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c447f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c447f8 Branch: refs/heads/master Commit: f6c447f87592286a6f58aee5e0b2dc8dcb470d0c Parents: 0985d2c Author: Evan Racah <ejra...@gmail.com> Authored: Wed Sep 2 22:13:18 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Sep 2 22:13:37 2015 -0700 ---------------------------------------------------------------------- .../storage/ShuffleBlockFetcherIterator.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6c447f8/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a759ceb..0d0448f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator( fetchRequests ++= Utils.randomize(remoteRequests) // Send out initial requests for blocks, up to our maxBytesInFlight - while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) - } + fetchUpToMaxBytes() val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) @@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator( case _ => } // Send fetch requests up to maxBytesInFlight - while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) - } + fetchUpToMaxBytes() result match { case FailureFetchResult(blockId, address, e) => @@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator( } } + private def fetchUpToMaxBytes(): Unit = { + // Send fetch requests up to maxBytesInFlight + while (fetchRequests.nonEmpty && + (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + sendRequest(fetchRequests.dequeue()) + } + } + private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = { blockId match { case ShuffleBlockId(shufId, mapId, reduceId) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org