[ 
https://issues.apache.org/jira/browse/SPARK-3350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119742#comment-14119742
 ] 

David Greco commented on SPARK-3350:
------------------------------------

I managed to make it working, below the passing code, I changed the function 
rowToAvroKey to return a function instead. Now it works. 
I think it's related to how spark serialises closures.
Just my .02
Hope it helps.

package com.eligotech.hnavigator.prototypes.spark

import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.avro.SchemaBuilder.FieldAssembler
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroOutputFormat, AvroWrapper, AvroJob, AvroKey}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.BooleanType
import org.apache.spark.sql.DoubleType
import org.apache.spark.sql.StructType
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.types.FloatType
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.types.LongType
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.catalyst.types.StructField
import org.apache.spark.{SparkConf, SparkContext}

object AvroWriteTestCase extends App {
  val conf = new SparkConf().setAppName("HarpoonSparkTest").setMaster("local")
  val sparkContext = new SparkContext(conf)
  val sqlContext = new SQLContext(sparkContext)

  val tpeople = sparkContext.parallelize((1 to 100).map(i => Person(s"CIAO$i", 
i)))
  case class Person(name: String, age: Int)
  import sqlContext._
  tpeople.registerTempTable("people")

  val people = sqlContext.sql("select * from people")

  def rowToAvroKey(structType: StructType): Row => (AvroKey[GenericRecord], 
NullWritable) = (row: Row) =>{
    val schema = structTypeToAvroSchema(structType)
    val record = new Record(schema)
    var i = 0
    structType.fields.foreach {
      f =>
        record.put(f.name, row.apply(i))
        i += 1
    }
    (new AvroKey(record), NullWritable.get())
  }

  def structTypeToAvroSchema(structType: StructType): Schema = {
    val fieldsAssembler: FieldAssembler[Schema] = 
SchemaBuilder.record("RECORD").fields()
    structType.fields foreach {
      (field: StructField) => field.dataType match {
        case IntegerType => 
fieldsAssembler.name(field.name).`type`().nullable().intType().noDefault()
        case LongType => 
fieldsAssembler.name(field.name).`type`().nullable().longType().noDefault()
        case StringType => 
fieldsAssembler.name(field.name).`type`().nullable().stringType().noDefault()
        case FloatType => 
fieldsAssembler.name(field.name).`type`().nullable().floatType().noDefault()
        case DoubleType => 
fieldsAssembler.name(field.name).`type`().nullable().doubleType().noDefault()
        case BooleanType => 
fieldsAssembler.name(field.name).`type`().nullable().booleanType().noDefault()
        case _ => throw new IllegalArgumentException("StructType  with 
unhandled type: " + field)
      }
    }
    fieldsAssembler.endRecord()
  }

  def saveAsAvroFile(schemaRDD: SchemaRDD, path: String): Unit = {
    val jobConf = new JobConf(schemaRDD.sparkContext.hadoopConfiguration)
    val schema = structTypeToAvroSchema(schemaRDD.schema)
    AvroJob.setOutputSchema(jobConf, schema)
    import org.apache.spark.SparkContext._
    schemaRDD.map(rowToAvroKey(schemaRDD.schema)).
      saveAsHadoopFile(path,
        classOf[AvroWrapper[GenericRecord]],
        classOf[NullWritable],
        classOf[AvroOutputFormat[GenericRecord]],
        jobConf)
  }

  saveAsAvroFile(people, "/tmp/people.avro")

}


> Strange anomaly trying to write a SchemaRDD into an Avro file
> -------------------------------------------------------------
>
>                 Key: SPARK-3350
>                 URL: https://issues.apache.org/jira/browse/SPARK-3350
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>         Environment: jdk1.7, macosx
>            Reporter: David Greco
>         Attachments: AvroWriteTestCase.scala
>
>
> I found a way to automatically save a SchemaRDD in Avro format, similarly to 
> what Spark does with parquet file.
> I attached a test case to this issue. The code fails with a NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to