[
https://issues.apache.org/jira/browse/SPARK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254107#comment-14254107
]
Imran Rashid commented on SPARK-3655:
-------------------------------------
I think secondary sort is a great addition, thanks for working on this [~koert].
But the discussions about TraversableOnce vs. Iterable have been bugging me,
making me feel perhaps there ought to be a different api, and I think I've
figured out why. I'd like to make a very different proposal for how we expose
this.
Rather than having some function take an unsorted RDD, do the shuffling, and
then give you exactly one view over the result, shouldn't there be some type
for just the sorted RDD, and then that type lets you call any of the different
views on it.
We could create SortedRDD as a subclass of RDD. It would have the property
that data was partition by X and sorted by X,Y. All the usual RDD functions
would exist, but eg. mapPartitions would just have the additional property that
you're iterating over elements in sorted order. And it could have all the
other util functions you like as well, eg. foldLeftByKey, groupByKeys, etc.
etc. which could all be built on top of mapPartitions.
I think explicitly tracking the idea of a sorted RDD buys us a few nice things:
1) at the most basic api level (mapPartitions), we don't get stuck into debates
about TraversableOnce vs Iterable, whether groups needs to fit in-memory or
not, etc. mapPartitions gives you an iterator, which implies nothing is in
memory, and which gives us the flexibility to change implementations down the
road which don't have the same requirements for buffering things in memory.
(Though for now we could still add the util functions which do require more
mem.)
2) Spark can do operations on sorted data, even if its not doing the sorting
itself. Eg., if you read a sorted file from hdfs (or from any other datastore
for that matter), you shouldn't need to force spark to sort the data *again*
just so you get access to the util functions which use sorting. Right now this
logic would need to live at application level, but this would be the first step
for us to integrate it more tightly into spark itself.
(sort of related to SPARK-1061)
3) I've always felt that the need to pull out the grouping key into the first
element of a tuple is a little klunky -- we can do away with that. The X & Y
for partitioning & sorting could be specified by arbitrary functions. Eg. say
you have some case class MyData(name: String, value: Int, count: Long), its a
nuisance to say
rdd.groupBy{_._1}.map{case (name, records) => records.map{case MyData(n2,v, c)
=> ...}}
or
rdd.map{x => x.name -> x}.groupByKeyAndSortValues(Ordering.by{x => (x.name,
x.value)}).map{case(name, records) => ...}
I'd prefer
val sortedRdd:SortedRDD = rdd.groupAndSort(_.name, _.value)
and then getting to do any of:
sortedRdd.foldByKey(0){case(prev, next) => ...}
or
sortedRdd.mapGroups{case (name, records) => ...}
or
sortedRdd.mapPartitions{itr => ...}
(Again, note that sortedRdd doesn't have to come from a sort by spark; it could
actually come directly from hdfs if the data was written out correctly, or from
any other input data source with the right properties)
> Support sorting of values in addition to keys (i.e. secondary sort)
> -------------------------------------------------------------------
>
> Key: SPARK-3655
> URL: https://issues.apache.org/jira/browse/SPARK-3655
> Project: Spark
> Issue Type: New Feature
> Components: Spark Core
> Affects Versions: 1.1.0, 1.2.0
> Reporter: koert kuipers
> Assignee: Koert Kuipers
> Priority: Minor
>
> Now that spark has a sort based shuffle, can we expect a secondary sort soon?
> There are some use cases where getting a sorted iterator of values per key is
> helpful.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]