Address Reynold's comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8703898d Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8703898d Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8703898d Branch: refs/heads/master Commit: 8703898d3f2c6b6e08b3ef91da67876589aba184 Parents: 3ca5230 Author: Aaron Davidson <[email protected]> Authored: Sun Nov 3 00:34:53 2013 -0700 Committer: Aaron Davidson <[email protected]> Committed: Sun Nov 3 21:34:44 2013 -0800 ---------------------------------------------------------------------- .../spark/storage/ShuffleBlockManager.scala | 28 +++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8703898d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index d1e3074..57b1a28 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -45,7 +45,7 @@ trait ShuffleBlocks { } /** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle @@ -57,11 +57,13 @@ trait ShuffleBlocks { * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. + * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) + * that specifies where in a given file the actual block data is located. * * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping - * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block - * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles - * destined for the block's reducer. + * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each + * block stored in that file. In order to find the location of a shuffle block, we search all + * ShuffleFiles destined for the block's reducer. * */ private[spark] @@ -98,18 +100,22 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { + def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { initializeShuffleMetadata(shuffleId, numBuckets) new ShuffleBlocks { override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) - val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - if (consolidateShuffleFiles) { + var fileGroup: ShuffleFileGroup = null + val writers = if (consolidateShuffleFiles) { + fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize) - } else { + } + } else { + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } @@ -142,8 +148,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { } private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { - if (!consolidateShuffleFiles) { return null } - val pool = shuffleToFileGroupPoolMap(shuffleId) var fileGroup = pool.getUnusedFileGroup()
