[
https://issues.apache.org/jira/browse/SPARK-3350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119742#comment-14119742
]
David Greco commented on SPARK-3350:
------------------------------------
I managed to make it working, below the passing code, I changed the function
rowToAvroKey to return a function instead. Now it works.
I think it's related to how spark serialises closures.
Just my .02
Hope it helps.
package com.eligotech.hnavigator.prototypes.spark
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.avro.SchemaBuilder.FieldAssembler
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroOutputFormat, AvroWrapper, AvroJob, AvroKey}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.BooleanType
import org.apache.spark.sql.DoubleType
import org.apache.spark.sql.StructType
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.types.FloatType
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.types.LongType
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.catalyst.types.StructField
import org.apache.spark.{SparkConf, SparkContext}
object AvroWriteTestCase extends App {
val conf = new SparkConf().setAppName("HarpoonSparkTest").setMaster("local")
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
val tpeople = sparkContext.parallelize((1 to 100).map(i => Person(s"CIAO$i",
i)))
case class Person(name: String, age: Int)
import sqlContext._
tpeople.registerTempTable("people")
val people = sqlContext.sql("select * from people")
def rowToAvroKey(structType: StructType): Row => (AvroKey[GenericRecord],
NullWritable) = (row: Row) =>{
val schema = structTypeToAvroSchema(structType)
val record = new Record(schema)
var i = 0
structType.fields.foreach {
f =>
record.put(f.name, row.apply(i))
i += 1
}
(new AvroKey(record), NullWritable.get())
}
def structTypeToAvroSchema(structType: StructType): Schema = {
val fieldsAssembler: FieldAssembler[Schema] =
SchemaBuilder.record("RECORD").fields()
structType.fields foreach {
(field: StructField) => field.dataType match {
case IntegerType =>
fieldsAssembler.name(field.name).`type`().nullable().intType().noDefault()
case LongType =>
fieldsAssembler.name(field.name).`type`().nullable().longType().noDefault()
case StringType =>
fieldsAssembler.name(field.name).`type`().nullable().stringType().noDefault()
case FloatType =>
fieldsAssembler.name(field.name).`type`().nullable().floatType().noDefault()
case DoubleType =>
fieldsAssembler.name(field.name).`type`().nullable().doubleType().noDefault()
case BooleanType =>
fieldsAssembler.name(field.name).`type`().nullable().booleanType().noDefault()
case _ => throw new IllegalArgumentException("StructType with
unhandled type: " + field)
}
}
fieldsAssembler.endRecord()
}
def saveAsAvroFile(schemaRDD: SchemaRDD, path: String): Unit = {
val jobConf = new JobConf(schemaRDD.sparkContext.hadoopConfiguration)
val schema = structTypeToAvroSchema(schemaRDD.schema)
AvroJob.setOutputSchema(jobConf, schema)
import org.apache.spark.SparkContext._
schemaRDD.map(rowToAvroKey(schemaRDD.schema)).
saveAsHadoopFile(path,
classOf[AvroWrapper[GenericRecord]],
classOf[NullWritable],
classOf[AvroOutputFormat[GenericRecord]],
jobConf)
}
saveAsAvroFile(people, "/tmp/people.avro")
}
> Strange anomaly trying to write a SchemaRDD into an Avro file
> -------------------------------------------------------------
>
> Key: SPARK-3350
> URL: https://issues.apache.org/jira/browse/SPARK-3350
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Environment: jdk1.7, macosx
> Reporter: David Greco
> Attachments: AvroWriteTestCase.scala
>
>
> I found a way to automatically save a SchemaRDD in Avro format, similarly to
> what Spark does with parquet file.
> I attached a test case to this issue. The code fails with a NPE.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]