[
https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699504#comment-14699504
]
Silas Davis commented on SPARK-3533:
------------------------------------
I'd like to suggest this be re-opened, writing partitions of a dataset to
separate files based on keys is a common use case that is provided by Hadoop
and Cascading for example.
DataFrameWriter has partitionBy, but is only supported for parquet, and does
not support cases where you wish to work with plain RDDs (for example using
specific avro classes, or when you want to transform using map, mapPartitions,
or combineByKey, which takes you out of DataFrame land).
I have a working implementation based on the Hadoop 2+ MultipleOutputs class.
The basic idea is to wrap an underlying OutputFormat within a OutputFormat
class that derives from a MultipleOuputsFormat class that maintains an instance
of MultipleOuputs for writing out based on key. Here is the gist:
https://gist.github.com/silasdavis/d1d1f1f7ab78249af462
I've included tests and helper functions for completeness, but the meat of the
implementation is the first 100 lines. You can also see how it's meant to be
used by look at the saveAsMultipleAvroFiles code:
https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L287
It would be useful to get some comments on the general idea. I've tried to use
as much of the Hadoop machinery as possible, similar to how PairRDDFunctions
does. This means no existing spark code has needed to be changed, but a similar
approach could be taken to incorporate MultipleOuptuts within the
saveAsNewAPIHadoopDataset method.
> Add saveAsTextFileByKey() method to RDDs
> ----------------------------------------
>
> Key: SPARK-3533
> URL: https://issues.apache.org/jira/browse/SPARK-3533
> Project: Spark
> Issue Type: Improvement
> Components: PySpark, Spark Core
> Affects Versions: 1.1.0
> Reporter: Nicholas Chammas
>
> Users often have a single RDD of key-value pairs that they want to save to
> multiple locations based on the keys.
> For example, say I have an RDD like this:
> {code}
> >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben',
> >>> 'Frankie']).keyBy(lambda x: x[0])
> >>> a.collect()
> [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
> >>> a.keys().distinct().collect()
> ['B', 'F', 'N']
> {code}
> Now I want to write the RDD out to different paths depending on the keys, so
> that I have one output directory per distinct key. Each output directory
> could potentially have multiple {{part-}} files, one per RDD partition.
> So the output would look something like:
> {code}
> /path/prefix/B [/part-1, /part-2, etc]
> /path/prefix/F [/part-1, /part-2, etc]
> /path/prefix/N [/part-1, /part-2, etc]
> {code}
> Though it may be possible to do this with some combination of
> {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the
> {{MultipleTextOutputFormat}} output format class, it isn't straightforward.
> It's not clear if it's even possible at all in PySpark.
> Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs
> that makes it easy to save RDDs out to multiple locations at once.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]