I am trying to serialize csv data in to Parquet format using Avro
Schema(Avro Backed) & again reading that into hive tables.
This is successfully getting serialized using following sample code
snippet(Sample code to serialize one single record):
/import java.io.File;//
// import java.io.IOException;//
// import java.math.BigDecimal;//
// import java.math.BigInteger;//
// import java.nio.ByteBuffer;//
////
// import org.apache.avro.Schema;//
// import org.apache.avro.generic.GenericData;//
// import org.apache.avro.generic.GenericData.Record;//
// import org.apache.avro.generic.GenericRecord;//
// import org.apache.hadoop.fs.Path;//
// import org.apache.parquet.avro.AvroSchemaConverter;//
// import org.apache.parquet.avro.AvroWriteSupport;//
// import org.apache.parquet.hadoop.ParquetWriter;//
// import org.apache.parquet.hadoop.metadata.CompressionCodecName;//
// import org.apache.parquet.schema.MesspidType;//
////
// public class AvroParquetConverter {//
////
// public static void main(String[] args) throws IOException {//
// Schema avroSchema = new Schema.Parser().parse(new
File("schema.avsc"));//
// GenericRecord myrecord = new GenericData.Record(avroSchema);//
// String outputFilename =
"/home/jai/sample1000-snappy.parquet";//
// Path outputPath = new Path(outputFilename);//
// MesspidType parquetSchema = new AvroSchemaConverter()//
// .convert(avroSchema);//
// AvroWriteSupport writeSupport = new
AvroWriteSupport(parquetSchema,//
// avroSchema);//
// CompressionCodecName compressionCodecSnappy =
CompressionCodecName.SNAPPY;//
// int blockSize = 256 * 1024 * 1024;//
// int ppidSize = 64 * 1024;//
////
// ParquetWriter parquetWriterSnappy = new
ParquetWriter(outputPath,//
// writeSupport, compressionCodecSnappy, blockSize,
ppidSize);//
// BigDecimal bd = new BigDecimal(20);//
// GenericRecord myrecordTemp = new
GenericData.Record(avroSchema);//
// myrecord.put("name", "Abhijeet1");//
// myrecord.put("pid", 20);//
// myrecord.put("favorite_number", 22);//
// String bd1 = "13.5";//
// BigDecimal bdecimal = new BigDecimal(bd1);//
// bdecimal.setScale(15, 6);//
// BigInteger bi = bdecimal.unscaledValue();//
// byte[] barray = bi.toByteArray();//
// ByteBuffer byteBuffer = ByteBuffer.allocate(barray.length);//
// byteBuffer.put(barray);//
// byteBuffer.rewind();//
// myrecord.put("price", byteBuffer);//
// parquetWriterSnappy.write(myrecord);//
// parquetWriterSnappy.close();//
// }//
// }/
Tried decimal to bytebuffer conversion is done using following statement
as well:
/ ByteBuffer.wrap(bdecimal.unscaledValue().toByteArray());/
Following is the avro schema file
/
// {//
// "namespace": "avropoc",//
// "type": "record",//
// "name": "User",//
// "fields": [//
// {"name": "name", "type": "string", "default" : "null"},//
// {"name": "favorite_number", "type": "int",
"default": 0 },//
// {"name": "pid", "type":"int", "default" : 0 },//
// {"name": "price", "type": {"type" :
"bytes","logicalType":"decimal","precision":15,"scale":6}, "default" : 0 }//
// ]//
// }/
Also tried following modification in to schema:
/ {"name": "price", "type":
"bytes","logicalType":"decimal","precision":15,"scale":6, "default" : 0 }/
And I am creating Hive table as follows:
/create external table avroparquet1//
// ( name string, favorite_number int,//
// pid int, price DECIMAL(15,6))//
// STORED AS PARQUET;//
/
But when I am running a query for decimal field price I am getting
following error message:
> Failed with exception
> java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable
> cannot be cast to org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
This looks like parquet/avro/hive related issue where it is not able to
deserialize Decimals which in case of avro needs to be written as
ByteBuffer.
I have tried this on avro 1.8.0, parquet 1.8.1 & Hive 1.1.0.
Any help would be appreciated.
Thanks,
Abhijeet