Address Reynold's comments

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8703898d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8703898d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8703898d

Branch: refs/heads/master
Commit: 8703898d3f2c6b6e08b3ef91da67876589aba184
Parents: 3ca5230
Author: Aaron Davidson <[email protected]>
Authored: Sun Nov 3 00:34:53 2013 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Sun Nov 3 21:34:44 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/ShuffleBlockManager.scala     | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8703898d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d1e3074..57b1a28 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,7 +45,7 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one file and
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle 
task gets one file
  * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, 
multiple shuffle
@@ -57,11 +57,13 @@ trait ShuffleBlocks {
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." 
Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when 
the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple 
(file, offset, length)
+ * that specifies where in a given file the actual block data is located.
  *
  * Shuffle file metadata is stored in a space-efficient manner. Rather than 
simply mapping
- * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of 
offsets for each block
- * stored in that file. In order to find the location of a shuffle block, we 
search all ShuffleFiles
- * destined for the block's reducer.
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list 
of offsets for each
+ * block stored in that file. In order to find the location of a shuffle 
block, we search all
+ * ShuffleFiles destined for the block's reducer.
  *
  */
 private[spark]
@@ -98,18 +100,22 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
   private
   val metadataCleaner = new 
MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
-  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = {
+  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): 
ShuffleBlocks = {
     initializeShuffleMetadata(shuffleId, numBuckets)
 
     new ShuffleBlocks {
       override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
         val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", 
"100").toInt * 1024
-        val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
-        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId 
=>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          if (consolidateShuffleFiles) {
+        var fileGroup: ShuffleFileGroup = null
+        val writers = if (consolidateShuffleFiles) {
+          fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, 
serializer, bufferSize)
-          } else {
+          }
+        } else {
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             val blockFile = blockManager.diskBlockManager.getFile(blockId)
             blockManager.getDiskWriter(blockId, blockFile, serializer, 
bufferSize)
           }
@@ -142,8 +148,6 @@ class ShuffleBlockManager(blockManager: BlockManager) 
extends Logging {
   }
 
   private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): 
ShuffleFileGroup = {
-    if (!consolidateShuffleFiles) { return null }
-
     val pool = shuffleToFileGroupPoolMap(shuffleId)
     var fileGroup = pool.getUnusedFileGroup()
 

Reply via email to