[ 
https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-7081:
------------------------------

Cross-posting [a comment from my 
PR|https://github.com/apache/spark/pull/5868#issuecomment-98888713]:

Based on some additional discussions, I think that we should use specialized 
sorter implementation that is specific to sort-based shuffle and design a 
separate sorter for more general-purpose record sorting.  By using a 
specialized sorter, we can benefit from several performance optimizations that 
would be difficult to implement in a more general-purpose sorter:

- We may not need a full 32-bits to record the partition id; since shuffles are 
unlikely to have more than, say, 10 million partitions, we can use fewer bits 
to encode the partition id, allowing us to pack it into a single word that uses 
the remaining bits to store the record pointer (exploiting the fact that we can 
choose a reasonable upper bound on our addressable memory (say 1 terabyte) and 
can save address bits due to word-alignment).  This should give us excellent 
cache-locality benefits, since the sort array will only require 8 bytes of 
space per record.
- Because sort-shuffle's sort keys are partition ids, we can expect to 
encounter long runs of records with the same sort key in the sorted file; this 
may not be the case in a more general-purpose sort, where there might be cases 
in which all of the sort keys are distinct.  As a result, this allows external 
sort to use a specialized merge procedure that operates on runs of records with 
the same key rather than individual records.
- Ignoring the spilling/merging case for a moment, the sort shuffle writer only 
needs to know individual records' lengths during the in-memory sort: we need to 
know a record's length in order to copy it to the sorted output file, but once 
a partition's records are adjacent in the output file we no longer need to know 
their individual lengths.  This is a consequence of the fact that the sorted 
data will be consumed by a serializer that knows how to identify record 
boundaries, building on our assumption that the serializer supports reordering 
of serialized records in its serialization stream.
- If we go further and assume that our IO compression codec supports 
concatenation of compressed data (which [Snappy seems to 
support|https://snappy.googlecode.com/svn/trunk/framing_format.txt]), then we 
can implement an IO-efficient merging procedure for external sort.  If we store 
an index file for each spill file, identifying the offsets of each partition 
within the sorted file, then the merge sort procedure can simply traverse these 
indicies and concatenate the partitions' serialized data without interpreting 
it. This would let us use methods like `transferTo` to implement copying 
without requiring data to be buffered in the JVM, sidestepping the complexity 
of managing IO buffers during the merge.

In light of this, I'm going to work on refactoring my external sorting branch 
to perform these optimizations and will update this pull request with those 
changes.  I'm going to remove the SPARK-7081 and SPARK-7078 JIRA links, since 
those JIRAs seem to be concerned with a more general-purpose record sorter for 
use in SQL joins.

> Faster sort-based shuffle path using binary processing cache-aware sort
> -----------------------------------------------------------------------
>
>                 Key: SPARK-7081
>                 URL: https://issues.apache.org/jira/browse/SPARK-7081
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle, Spark Core
>            Reporter: Reynold Xin
>            Assignee: Josh Rosen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to