I think a cheap way to repartition to a higher partition count without shuffle would be valuable too. Right now you can choose whether to execute a shuffle when going down in partition count, but going up in partition count always requires a shuffle. For the need of having a smaller partitions to make .toLocalIterator more efficient, no shuffle on increase of partition count is necessary.
Filed as https://issues.apache.org/jira/browse/SPARK-5997 On Wed, Feb 18, 2015 at 3:21 PM, Mingyu Kim <m...@palantir.com> wrote: > Another alternative would be to compress the partition in memory in a > streaming fashion instead of calling .toArray on the iterator. Would it be > an easier mitigation to the problem? Or, is it hard to compress the rows > one by one without materializing the full partition in memory using the > compression algo Spark uses currently? > > Mingyu > > > > > > On 2/18/15, 1:01 PM, "Imran Rashid" <iras...@cloudera.com> wrote: > > >This would be pretty tricky to do -- the issue is that right now > >sparkContext.runJob has you pass in a function from a partition to *one* > >result object that gets serialized and sent back: Iterator[T] => U, and > >that idea is baked pretty deep into a lot of the internals, DAGScheduler, > >Task, Executors, etc. > > > >Maybe another possibility worth considering: should we make it easy to go > >from N partitions to 2N partitions (or any other multiple obviously) > >without requiring a shuffle? for that matter, you should also be able to > >go from 2N to N without a shuffle as well. That change is also somewhat > >involved, though. > > > >Both are in theory possible, but I imagine they'd need really compelling > >use cases. > > > >An alternative would be to write your RDD to some other data store (eg, > >hdfs) which has better support for reading data in a streaming fashion, > >though you would probably be unhappy with the overhead. > > > > > > > >On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash <and...@andrewash.com> wrote: > > > >> Hi Spark devs, > >> > >> I'm creating a streaming export functionality for RDDs and am having > >>some > >> trouble with large partitions. The RDD.toLocalIterator() call pulls > >>over a > >> partition at a time to the driver, and then streams the RDD out from > >>that > >> partition before pulling in the next one. When you have large > >>partitions > >> though, you can OOM the driver, especially when multiple of these > >>exports > >> are happening in the same SparkContext. > >> > >> One idea I had was to repartition the RDD so partitions are smaller, but > >> it's hard to know a priori what the partition count should be, and I'd > >>like > >> to avoid paying the shuffle cost if possible -- I think repartition to a > >> higher partition count forces a shuffle. > >> > >> Is it feasible to rework this so the executor -> driver transfer in > >> .toLocalIterator is a steady stream rather than a partition at a time? > >> > >> Thanks! > >> Andrew > >> > >