Merge pull request #130 from aarondav/shuffle

Memory-optimized shuffle file consolidation

Reduces overhead of each shuffle block for consolidation from >300 bytes to 8 
bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle 
blocks, net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the 
runtime of the shuffle phase in this test was only around 2% slower, while the 
reduce phase was 40% faster, when compared to not using any shuffle file 
consolidation.

This is accomplished by replacing the map from ShuffleBlockId to FileSegment 
(i.e., block id to where it's located), which had high overhead due to being a 
gigantic, timestamped, concurrent map with a more space-efficient structure. 
Namely, the following are introduced (I have omitted the word "Shuffle" from 
some names for clarity):
**ShuffleFile** - there is one ShuffleFile per consolidated shuffle file on 
disk. We store an array of offsets into the physical shuffle file for each 
ShuffleMapTask that wrote into the file. This is sufficient to reconstruct 
FileSegments for mappers that are in the file.
**FileGroup** - contains a set of ShuffleFiles, one per reducer, that a MapTask 
can use to write its output. There is one FileGroup created per _concurrent_ 
MapTask. The FileGroup contains an array of the mapIds that have been written 
to all files in the group. The positions of elements in this array map directly 
onto the positions in each ShuffleFile's offsets array.

In order to locate the FileSegment associated with a BlockId, we have another 
structure which maps each reducer to the set of ShuffleFiles that were created 
for it. (There will be as many ShuffleFiles per reducer as there are 
FileGroups.) To lookup a given ShuffleBlockId (shuffleId, reducerId, mapId), we 
thus search through all ShuffleFiles associated with that reducer.

As a time optimization, we ensure that FileGroups are only reused for MapTasks 
with monotonically increasing mapIds. This allows us to perform a binary search 
to locate a mapId inside a group, and also enables potential future 
optimization (based on the usual monotonic access order).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7a26104a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7a26104a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7a26104a

Branch: refs/heads/master
Commit: 7a26104ab7cb492b347ba761ef1f17ca1b9078e4
Parents: b5dc339 1ba11b1
Author: Reynold Xin <[email protected]>
Authored: Mon Nov 4 17:54:06 2013 -0800
Committer: Reynold Xin <[email protected]>
Committed: Mon Nov 4 17:54:06 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  23 +--
 .../org/apache/spark/storage/BlockManager.scala |  10 +-
 .../spark/storage/BlockObjectWriter.scala       |  15 +-
 .../apache/spark/storage/DiskBlockManager.scala |  49 +----
 .../org/apache/spark/storage/DiskStore.scala    |   4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 189 +++++++++++++++----
 .../spark/storage/StoragePerfTester.scala       |  10 +-
 .../org/apache/spark/util/MetadataCleaner.scala |   2 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |   6 +
 .../spark/util/collection/PrimitiveVector.scala |  51 +++++
 .../spark/storage/DiskBlockManagerSuite.scala   |  84 +++++++++
 11 files changed, 333 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


Reply via email to