[ https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485582#comment-14485582 ]
Ilya Ganelin commented on SPARK-6780: ------------------------------------- This code was my attempt to implement this within PythonRDD.scala but I ran into run-time reflection issues I could not solve. {code} /** * Output a Python RDD of key-value pairs to any Hadoop file system such that the values within * the rdd are written to sub-directories organized by the associated key. * * Keys and values are converted to suitable output types using either user specified converters * or, if not specified, [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion * types `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of * this RDD. */ def saveAsHadoopFileByKey[K, V, C <: CompressionCodec]( pyRDD: JavaRDD[Array[Byte]], batchSerialized: Boolean, path: String, outputFormatClass: String, keyClass: String, valueClass: String, keyConverterClass: String, valueConverterClass: String, confAsMap: java.util.HashMap[String, String], compressionCodecClass: String) = { val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized) val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse( inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass)) val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]]) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, new JavaToWritableConverter) converted.saveAsHadoopFile(path, ClassUtils.primitiveToWrapper(kc), ClassUtils.primitiveToWrapper(vc), classOf[RDDMultipleTextOutputFormat[K,V]], new JobConf(mergedConf), codec=codec) } {code} > Add saveAsTextFileByKey method for PySpark > ------------------------------------------ > > Key: SPARK-6780 > URL: https://issues.apache.org/jira/browse/SPARK-6780 > Project: Spark > Issue Type: Improvement > Components: PySpark > Reporter: Ilya Ganelin > > The PySpark API should have a method to allow saving a key-value RDD to > subdirectories organized by key as in : > https://issues.apache.org/jira/browse/SPARK-3533 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org