Add documentation and address other comments

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1592adfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1592adfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1592adfa

Branch: refs/heads/master
Commit: 1592adfa259860494353babfb48c80b7d1087379
Parents: 7d44dec
Author: Aaron Davidson <[email protected]>
Authored: Sat Nov 2 00:19:04 2013 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Sun Nov 3 21:34:44 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockManager.scala | 12 ++---
 .../spark/storage/ShuffleBlockManager.scala     | 49 ++++++++++++--------
 2 files changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1592adfa/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index bde3d1f..fcd2e97 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,14 +55,12 @@ private[spark] class DiskBlockManager(shuffleManager: 
ShuffleBlockManager, rootD
    * Otherwise, we assume the Block is mapped to a whole file identified by 
the BlockId directly.
    */
   def getBlockLocation(blockId: BlockId): FileSegment = {
-    if (blockId.isShuffle) {
-      val segment = 
shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
-      if (segment.isDefined) { return segment.get }
-      // If no special mapping found, assume standard block -> file mapping...
+    if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
+      shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
+    } else {
+      val file = getFile(blockId.name)
+      new FileSegment(file, 0, file.length())
     }
-
-    val file = getFile(blockId.name)
-    new FileSegment(file, 0, file.length())
   }
 
   def getFile(filename: String): File = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1592adfa/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d718c87..d1e3074 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,18 +45,24 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one writer
- * per reducer.
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one file and
+ * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, 
multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle 
file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing 
to its shuffle files,
- * it releases them for another task.
+ * blocks are aggregated into the same file. There is one "combined shuffle 
file" (ShuffleFile) per
+ * reducer per concurrently executing shuffle task. As soon as a task finishes 
writing to its
+ * shuffle files, it releases them for another task.
  * Regarding the implementation of this feature, shuffle files are identified 
by a 3-tuple:
  *   - shuffleId: The unique id given to the entire shuffle stage.
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." 
Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when 
the task finishes.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than 
simply mapping
+ * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of 
offsets for each block
+ * stored in that file. In order to find the location of a shuffle block, we 
search all ShuffleFiles
+ * destined for the block's reducer.
+ *
  */
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
@@ -124,9 +130,9 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
     }
   }
 
-  def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
+  private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
     val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new 
ShuffleFileGroupPool())
-    if (prev == None) {
+    if (!prev.isDefined) {
       val reducerToFilesMap = new 
Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
       for (reducerId <- 0 until numBuckets) {
         reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
@@ -142,6 +148,8 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
     var fileGroup = pool.getUnusedFileGroup()
 
     // If we reuse a file group, ensure we maintain mapId monotonicity.
+    // This means we may create extra ShuffleFileGroups if we're trying to run 
a map task
+    // that is out-of-order with respect to its mapId (which may happen when 
failures occur).
     val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
     while (fileGroup != null && fileGroup.maxMapId >= mapId) {
       fileGroupsToReturn += fileGroup
@@ -170,21 +178,19 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
 
   /**
    * Returns the physical file segment in which the given BlockId is located.
-   * If we have no special mapping, None will be returned.
+   * This function should only be called if shuffle file consolidation is 
enabled, as it is
+   * an error condition if we don't find the expected block.
    */
-  def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = {
+  def getBlockLocation(id: ShuffleBlockId): FileSegment = {
     // Search all files associated with the given reducer.
     // This process is O(m log n) for m threads and n mappers. Could be 
sweetened to "likely" O(m).
-    if (consolidateShuffleFiles) {
-      val filesForReducer = 
shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
-      for (file <- filesForReducer) {
-        val segment = file.getFileSegmentFor(id.mapId)
-        if (segment != None) { return segment }
-      }
-
-      logInfo("Failed to find shuffle block: " + id)
+    val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
+    for (file <- filesForReducer) {
+      val segment = file.getFileSegmentFor(id.mapId)
+      if (segment != None) { return segment.get }
     }
-    None
+
+    throw new IllegalStateException("Failed to find shuffle block: " + id)
   }
 
   private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
@@ -204,6 +210,10 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: 
Array[ShuffleFile]) {
+  /**
+   * Contains the set of mappers that have written to this file group, in the 
same order as they
+   * have written to their respective files.
+   */
   private val mapIds = new PrimitiveVector[Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
@@ -238,8 +248,9 @@ class ShuffleFile(val file: File) {
   /**
    * Consecutive offsets of blocks into the file, ordered by position in the 
file.
    * This ordering allows us to compute block lengths by examining the 
following block offset.
+   * blockOffsets(i) contains the offset for the mapper in 
shuffleFileGroup.mapIds(i).
    */
-  val blockOffsets = new PrimitiveVector[Long]()
+  private val blockOffsets = new PrimitiveVector[Long]()
 
   /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
   private var shuffleFileGroup : ShuffleFileGroup = _

Reply via email to