I think a cheap way to repartition to a higher partition count without
shuffle would be valuable too.  Right now you can choose whether to execute
a shuffle when going down in partition count, but going up in partition
count always requires a shuffle.  For the need of having a smaller
partitions to make .toLocalIterator more efficient, no shuffle on increase
of partition count is necessary.

Filed as https://issues.apache.org/jira/browse/SPARK-5997

On Wed, Feb 18, 2015 at 3:21 PM, Mingyu Kim <m...@palantir.com> wrote:

> Another alternative would be to compress the partition in memory in a
> streaming fashion instead of calling .toArray on the iterator. Would it be
> an easier mitigation to the problem? Or, is it hard to compress the rows
> one by one without materializing the full partition in memory using the
> compression algo Spark uses currently?
>
> Mingyu
>
>
>
>
>
> On 2/18/15, 1:01 PM, "Imran Rashid" <iras...@cloudera.com> wrote:
>
> >This would be pretty tricky to do -- the issue is that right now
> >sparkContext.runJob has you pass in a function from a partition to *one*
> >result object that gets serialized and sent back: Iterator[T] => U, and
> >that idea is baked pretty deep into a lot of the internals, DAGScheduler,
> >Task, Executors, etc.
> >
> >Maybe another possibility worth considering: should we make it easy to go
> >from N partitions to 2N partitions (or any other multiple obviously)
> >without requiring a shuffle?  for that matter, you should also be able to
> >go from 2N to N without a shuffle as well.  That change is also somewhat
> >involved, though.
> >
> >Both are in theory possible, but I imagine they'd need really compelling
> >use cases.
> >
> >An alternative would be to write your RDD to some other data store (eg,
> >hdfs) which has better support for reading data in a streaming fashion,
> >though you would probably be unhappy with the overhead.
> >
> >
> >
> >On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash <and...@andrewash.com> wrote:
> >
> >> Hi Spark devs,
> >>
> >> I'm creating a streaming export functionality for RDDs and am having
> >>some
> >> trouble with large partitions.  The RDD.toLocalIterator() call pulls
> >>over a
> >> partition at a time to the driver, and then streams the RDD out from
> >>that
> >> partition before pulling in the next one.  When you have large
> >>partitions
> >> though, you can OOM the driver, especially when multiple of these
> >>exports
> >> are happening in the same SparkContext.
> >>
> >> One idea I had was to repartition the RDD so partitions are smaller, but
> >> it's hard to know a priori what the partition count should be, and I'd
> >>like
> >> to avoid paying the shuffle cost if possible -- I think repartition to a
> >> higher partition count forces a shuffle.
> >>
> >> Is it feasible to rework this so the executor -> driver transfer in
> >> .toLocalIterator is a steady stream rather than a partition at a time?
> >>
> >> Thanks!
> >> Andrew
> >>
>
>

Reply via email to