[ 
https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14544422#comment-14544422
 ] 

Josh Rosen commented on SPARK-7081:
-----------------------------------

Also, note that I posted some simple performance benchmarks at 
https://github.com/apache/spark/pull/5868#issuecomment-101837095

> Faster sort-based shuffle path using binary processing cache-aware sort
> -----------------------------------------------------------------------
>
>                 Key: SPARK-7081
>                 URL: https://issues.apache.org/jira/browse/SPARK-7081
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle, Spark Core
>            Reporter: Reynold Xin
>            Assignee: Josh Rosen
>             Fix For: 1.4.0
>
>
> (Description copied from GitHub):
> This patch introduces a new shuffle manager that enhances the existing 
> sort-based shuffle with a new cache-friendly sort algorithm that operates 
> directly on binary data. The goals of this patch are to lower memory usage 
> and Java object overheads during shuffle and to speed up sorting. It also 
> lays groundwork for follow-up patches that will enable end-to-end processing 
> of serialized records.
> The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting 
> `spark.shuffle.manager=tungsten-sort` in SparkConf.
> The new shuffle manager uses directly-managed memory to implement several 
> performance optimizations for certain types of shuffles. In cases where the 
> new performance optimizations cannot be applied, the new shuffle manager 
> delegates to SortShuffleManager to handle those shuffles.
> UnsafeShuffleManager's optimizations will apply when _all_ of the following 
> conditions hold:
>  - The shuffle dependency specifies no aggregation or output ordering.
>  - The shuffle serializer supports relocation of serialized values (this is 
> currently supported
>    by KryoSerializer and Spark SQL's custom serializers).
>  - The shuffle produces fewer than 16777216 output partitions.
>  - No individual record is larger than 128 MB when serialized.
> In addition, extra spill-merging optimizations are automatically applied when 
> the shuffle compression codec supports concatenation of serialized streams. 
> This is currently supported by Spark's LZF serializer.
> At a high-level, UnsafeShuffleManager's design is similar to Spark's existing 
> SortShuffleManager.  In sort-based shuffle, incoming records are sorted 
> according to their target partition ids, then written to a single map output 
> file. Reducers fetch contiguous regions of this file in order to read their 
> portion of the map output. In cases where the map output data is too large to 
> fit in memory, sorted subsets of the output can are spilled to disk and those 
> on-disk files are merged to produce the final output file.
> UnsafeShuffleManager optimizes this process in several ways:
>  - Its sort operates on serialized binary data rather than Java objects, 
> which reduces memory consumption and GC overheads. This optimization requires 
> the record serializer to have certain properties to allow serialized records 
> to be re-ordered without requiring deserialization.  See SPARK-4550, where 
> this optimization was first proposed and implemented, for more details.
>  - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) 
> that sorts arrays of compressed record pointers and partition ids. By using 
> only 8 bytes of space per record in the sorting array, this fits more of the 
> array into cache.
>  - The spill merging procedure operates on blocks of serialized records that 
> belong to the same partition and does not need to deserialize records during 
> the merge.
>  - When the spill compression codec supports concatenation of compressed 
> data, the spill merge simply concatenates the serialized and compressed spill 
> partitions to produce the final output partition.  This allows efficient data 
> copying methods, like NIO's `transferTo`, to be used and avoids the need to 
> allocate decompression or copying buffers during the merge.
> The shuffle read path is unchanged.
> This patch is similar to http://issues.apache.org/jira/browse/SPARK-4550  but 
> uses a slightly different implementation. The `unsafe`-based implementation 
> featured in this patch lays the groundwork for followup patches that will 
> enable sorting to operate on serialized data pages that will be prepared by 
> Spark SQL's new `unsafe` operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to