[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485582#comment-14485582
 ] 

Ilya Ganelin commented on SPARK-6780:
-------------------------------------

This code was my attempt to implement this within PythonRDD.scala but I ran 
into run-time reflection issues I could not solve.

{code}
  /**
   * Output a Python RDD of key-value pairs to any Hadoop file system such that 
the values within
   * the rdd are written to sub-directories organized by the associated key.
   *
   * Keys and values are converted to suitable output types using either user 
specified converters
   * or, if not specified, 
[[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion
   * types `keyClass` and `valueClass` are automatically inferred if not 
specified. The passed-in
   * `confAsMap` is merged with the default Hadoop conf associated with the 
SparkContext of
   * this RDD.
   */
  def saveAsHadoopFileByKey[K, V, C <: CompressionCodec](
      pyRDD: JavaRDD[Array[Byte]],
      batchSerialized: Boolean,
      path: String,
      outputFormatClass: String,
      keyClass: String,
      valueClass: String,
      keyConverterClass: String,
      valueConverterClass: String,
      confAsMap: java.util.HashMap[String, String],
      compressionCodecClass: String) = {
    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
    val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
    val codec = 
Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
      new JavaToWritableConverter)

    converted.saveAsHadoopFile(path,
      ClassUtils.primitiveToWrapper(kc),
      ClassUtils.primitiveToWrapper(vc),
      classOf[RDDMultipleTextOutputFormat[K,V]],
      new JobConf(mergedConf),
      codec=codec)
  }

{code}

> Add saveAsTextFileByKey method for PySpark
> ------------------------------------------
>
>                 Key: SPARK-6780
>                 URL: https://issues.apache.org/jira/browse/SPARK-6780
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>            Reporter: Ilya Ganelin
>
> The PySpark API should have a method to allow saving a key-value RDD to 
> subdirectories organized by key as in :
> https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to