[
https://issues.apache.org/jira/browse/SPARK-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14030014#comment-14030014
]
Sandy Ryza commented on SPARK-2114:
-----------------------------------
The Spark shuffle currently offers a record-oriented interface. Moving
groupByKey and join onto raw data would require something like the following
changes:
* Change ShuffleFetcher to return an Iterator of ByteBuffers instead of an
Iterator of deserialized objects.
* Change BlockFetcherIterator accordingly.
* Add a ShuffleRecordFetcher that wraps ShuffleFetcher and returns an Iterator
of deserialized objects.
* Move ShuffledRDD onto ShuffleRecordFetcher.
* Add a RawShuffledRDD.
* Move groupByKey off of combineByKey and onto RawShuffledRDD.
* Write the length of each key and value before it, probably using
variable-length coding.
* Add a RawGrouper interface that implements equals and hashCode on serialized
keys. In most cases, we would just use the equals and hash of the raw bytes.
* For groupByKeys and joins, instead of the reduce-side ExternalAppendOnlyMap
storing ArrayLists of records for each key, we would store growable arrays of
pointers. Each would point at a raw record in one of the shuffle block
ByteBuffers. This means we would need to keep the shuffle blocks around,
though we can dispose of them when we spill.
I imagine this will shift around a little bit after SPARK-2044.
> groupByKey and joins on raw data
> --------------------------------
>
> Key: SPARK-2114
> URL: https://issues.apache.org/jira/browse/SPARK-2114
> Project: Spark
> Issue Type: New Feature
> Components: Shuffle, Spark Core
> Affects Versions: 1.0.0
> Reporter: Sandy Ryza
>
> For groupByKey and join transformations, Spark tasks on the reduce side
> deserialize every record into a Java object before calling any user function.
>
> This causes all kinds of problems for garbage collection - when aggregating
> enough data, objects can escape the young gen and trigger full GCs down the
> line. Additionally, when records are spilled, they must be serialized and
> deserialized multiple times.
> It would be helpful to allow aggregations on serialized data - using some
> sort of RawHasher interface that could implement hashCode and equals for
> serialized records. This would also require encoding record boundaries in
> the serialized format, which I'm not sure we currently do.
--
This message was sent by Atlassian JIRA
(v6.2#6252)