[
https://issues.apache.org/jira/browse/SPARK-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562386#comment-14562386
]
Rui Li commented on SPARK-7081:
-------------------------------
Hi [~joshrosen], requiring the dependency having no aggregation or key ordering
seems to prevent lots of shuffle from leveraging this new optimization, e.g.
reduceByKey, sortByKey. Do you have any plan to relax the limitation?
> Faster sort-based shuffle path using binary processing cache-aware sort
> -----------------------------------------------------------------------
>
> Key: SPARK-7081
> URL: https://issues.apache.org/jira/browse/SPARK-7081
> Project: Spark
> Issue Type: New Feature
> Components: Shuffle, Spark Core
> Reporter: Reynold Xin
> Assignee: Josh Rosen
> Fix For: 1.4.0
>
>
> (Description copied from GitHub):
> This patch introduces a new shuffle manager that enhances the existing
> sort-based shuffle with a new cache-friendly sort algorithm that operates
> directly on binary data. The goals of this patch are to lower memory usage
> and Java object overheads during shuffle and to speed up sorting. It also
> lays groundwork for follow-up patches that will enable end-to-end processing
> of serialized records.
> The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting
> `spark.shuffle.manager=tungsten-sort` in SparkConf.
> The new shuffle manager uses directly-managed memory to implement several
> performance optimizations for certain types of shuffles. In cases where the
> new performance optimizations cannot be applied, the new shuffle manager
> delegates to SortShuffleManager to handle those shuffles.
> UnsafeShuffleManager's optimizations will apply when _all_ of the following
> conditions hold:
> - The shuffle dependency specifies no aggregation or output ordering.
> - The shuffle serializer supports relocation of serialized values (this is
> currently supported
> by KryoSerializer and Spark SQL's custom serializers).
> - The shuffle produces fewer than 16777216 output partitions.
> - No individual record is larger than 128 MB when serialized.
> In addition, extra spill-merging optimizations are automatically applied when
> the shuffle compression codec supports concatenation of serialized streams.
> This is currently supported by Spark's LZF serializer.
> At a high-level, UnsafeShuffleManager's design is similar to Spark's existing
> SortShuffleManager. In sort-based shuffle, incoming records are sorted
> according to their target partition ids, then written to a single map output
> file. Reducers fetch contiguous regions of this file in order to read their
> portion of the map output. In cases where the map output data is too large to
> fit in memory, sorted subsets of the output can are spilled to disk and those
> on-disk files are merged to produce the final output file.
> UnsafeShuffleManager optimizes this process in several ways:
> - Its sort operates on serialized binary data rather than Java objects,
> which reduces memory consumption and GC overheads. This optimization requires
> the record serializer to have certain properties to allow serialized records
> to be re-ordered without requiring deserialization. See SPARK-4550, where
> this optimization was first proposed and implemented, for more details.
> - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter)
> that sorts arrays of compressed record pointers and partition ids. By using
> only 8 bytes of space per record in the sorting array, this fits more of the
> array into cache.
> - The spill merging procedure operates on blocks of serialized records that
> belong to the same partition and does not need to deserialize records during
> the merge.
> - When the spill compression codec supports concatenation of compressed
> data, the spill merge simply concatenates the serialized and compressed spill
> partitions to produce the final output partition. This allows efficient data
> copying methods, like NIO's `transferTo`, to be used and avoids the need to
> allocate decompression or copying buffers during the merge.
> The shuffle read path is unchanged.
> This patch is similar to http://issues.apache.org/jira/browse/SPARK-4550 but
> uses a slightly different implementation. The `unsafe`-based implementation
> featured in this patch lays the groundwork for followup patches that will
> enable sorting to operate on serialized data pages that will be prepared by
> Spark SQL's new `unsafe` operators.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]