Repository: spark
Updated Branches:
  refs/heads/master 0985d2c30 -> f6c447f87


Removed code duplication in ShuffleBlockFetcherIterator

Added fetchUpToMaxBytes() to prevent having to update both code blocks when a 
change is made.

Author: Evan Racah <ejra...@gmail.com>

Closes #8514 from eracah/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c447f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c447f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c447f8

Branch: refs/heads/master
Commit: f6c447f87592286a6f58aee5e0b2dc8dcb470d0c
Parents: 0985d2c
Author: Evan Racah <ejra...@gmail.com>
Authored: Wed Sep 2 22:13:18 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 2 22:13:37 2015 -0700

----------------------------------------------------------------------
 .../storage/ShuffleBlockFetcherIterator.scala     | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6c447f8/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a759ceb..0d0448f 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
     fetchRequests ++= Utils.randomize(remoteRequests)
 
     // Send out initial requests for blocks, up to our maxBytesInFlight
-    while (fetchRequests.nonEmpty &&
-      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
-      sendRequest(fetchRequests.dequeue())
-    }
+    fetchUpToMaxBytes()
 
     val numFetches = remoteRequests.size - fetchRequests.size
     logInfo("Started " + numFetches + " remote fetches in" + 
Utils.getUsedTimeMs(startTime))
@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
       case _ =>
     }
     // Send fetch requests up to maxBytesInFlight
-    while (fetchRequests.nonEmpty &&
-      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
-      sendRequest(fetchRequests.dequeue())
-    }
+    fetchUpToMaxBytes()
 
     result match {
       case FailureFetchResult(blockId, address, e) =>
@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  private def fetchUpToMaxBytes(): Unit = {
+    // Send fetch requests up to maxBytesInFlight
+    while (fetchRequests.nonEmpty &&
+      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
+      sendRequest(fetchRequests.dequeue())
+    }
+  }
+
   private def throwFetchFailedException(blockId: BlockId, address: 
BlockManagerId, e: Throwable) = {
     blockId match {
       case ShuffleBlockId(shufId, mapId, reduceId) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to