[
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485582#comment-14485582
]
Ilya Ganelin commented on SPARK-6780:
-------------------------------------
This code was my attempt to implement this within PythonRDD.scala but I ran
into run-time reflection issues I could not solve.
{code}
/**
* Output a Python RDD of key-value pairs to any Hadoop file system such that
the values within
* the rdd are written to sub-directories organized by the associated key.
*
* Keys and values are converted to suitable output types using either user
specified converters
* or, if not specified,
[[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion
* types `keyClass` and `valueClass` are automatically inferred if not
specified. The passed-in
* `confAsMap` is merged with the default Hadoop conf associated with the
SparkContext of
* this RDD.
*/
def saveAsHadoopFileByKey[K, V, C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
outputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
compressionCodecClass: String) = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec =
Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
converted.saveAsHadoopFile(path,
ClassUtils.primitiveToWrapper(kc),
ClassUtils.primitiveToWrapper(vc),
classOf[RDDMultipleTextOutputFormat[K,V]],
new JobConf(mergedConf),
codec=codec)
}
{code}
> Add saveAsTextFileByKey method for PySpark
> ------------------------------------------
>
> Key: SPARK-6780
> URL: https://issues.apache.org/jira/browse/SPARK-6780
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Reporter: Ilya Ganelin
>
> The PySpark API should have a method to allow saving a key-value RDD to
> subdirectories organized by key as in :
> https://issues.apache.org/jira/browse/SPARK-3533
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]