[
https://issues.apache.org/jira/browse/SPARK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628030#comment-14628030
]
Daniel Darabos commented on SPARK-8836:
---------------------------------------
I just noticed this has actually been implemented already in SPARK-2213! Cool!
Too bad it was done as an optimization for SparkSQL and not in Spark Core.
> Sorted join
> -----------
>
> Key: SPARK-8836
> URL: https://issues.apache.org/jira/browse/SPARK-8836
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 1.4.0
> Reporter: Daniel Darabos
> Priority: Minor
>
> In my [Spark Summit 2015
> presentation|https://spark-summit.org/2015/events/interactive-graph-analytics-with-spark/]
> I touted sorted joins. It would be a shame to talk about how great they are
> and then not try to introduce them into Spark.
> When joining co-partitioned RDDs, the current Spark implementation builds a
> map of the contents of one partition and looks up the items from the other
> partition.
> (https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
> using AppendOnlyMap.)
> Another option for lining up the keys from the two partitions is to sort them
> both and then merge. Just doing this may already be a performance improvement.
> But what we do is we sort the partitions up-front, and then enjoy the
> benefits over many operations. Our joins are 10x faster than normal Spark
> joins and don't trigger GC. The hash-based join builds a large hashmap (the
> size of the partition) while the sorted join does not allocate any memory.
> The sorted partitions also benefit other operations, such as distinct, where
> we also avoid building a hashmap. (I think the logic is similar to sort-based
> shuffle, just at a later stage of the process.)
> Our implementation is based on zipPartitions, and this is entirely workable.
> We have a custom RDD subclass (SortedRDD) and it overrides a bunch of
> methods. We have an implicit class that adds a toSortedRDD method on
> pair-RDDs.
> But I think integrating this into Spark could take it a step further. What we
> have not investigated is cases where the sorting could be skipped. For
> example when an RDD came out of a sort-based shuffle, its partitions will be
> sorted, right? So even if the user never asks for the partitions to be
> sorted, they can become so, and the faster sorted implementations of join,
> distinct, etc could kick in automatically. This would speed up applications
> without any change in their code.
> Instead of a subclass it would probably be best to do this with a simple
> "hasSortedPartitions" variable in the RDD. Then perhaps operations could have
> a "preservesPartitionOrder" parameter, like it is done with "partitioner" and
> "preservesPartitioning" now. (For example filter(), mapValues(), join(), and
> distinct() all keep the partition sorted.)
> What do you think about all this?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]