[ 
https://issues.apache.org/jira/browse/SPARK-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14030014#comment-14030014
 ] 

Sandy Ryza commented on SPARK-2114:
-----------------------------------

The Spark shuffle currently offers a record-oriented interface.  Moving 
groupByKey and join onto raw data would require something like the following 
changes:
* Change ShuffleFetcher to return an Iterator of ByteBuffers instead of an 
Iterator of deserialized objects.
* Change BlockFetcherIterator accordingly.
* Add a ShuffleRecordFetcher that wraps ShuffleFetcher and returns an Iterator 
of deserialized objects.
* Move ShuffledRDD onto ShuffleRecordFetcher.
* Add a RawShuffledRDD.
* Move groupByKey off of combineByKey and onto RawShuffledRDD.
* Write the length of each key and value before it, probably using 
variable-length coding.
* Add a RawGrouper interface that implements equals and hashCode on serialized 
keys.  In most cases, we would just use the equals and hash of the raw bytes.
* For groupByKeys and joins, instead of the reduce-side ExternalAppendOnlyMap 
storing ArrayLists of records for each key, we would store growable arrays of 
pointers.  Each would point at a raw record in one of the shuffle block 
ByteBuffers.  This means we would need to keep the shuffle blocks around, 
though we can dispose of them when we spill.

I imagine this will shift around a little bit after SPARK-2044.

> groupByKey and joins on raw data
> --------------------------------
>
>                 Key: SPARK-2114
>                 URL: https://issues.apache.org/jira/browse/SPARK-2114
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.0
>            Reporter: Sandy Ryza
>
> For groupByKey and join transformations, Spark tasks on the reduce side 
> deserialize every record into a Java object before calling any user function. 
>  
> This causes all kinds of problems for garbage collection - when aggregating 
> enough data, objects can escape the young gen and trigger full GCs down the 
> line.  Additionally, when records are spilled, they must be serialized and 
> deserialized multiple times.
> It would be helpful to allow aggregations on serialized data - using some 
> sort of RawHasher interface that could implement hashCode and equals for 
> serialized records.  This would also require encoding record boundaries in 
> the serialized format, which I'm not sure we currently do.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to