Hi Ryan ,
I am using parquet-avro as of now(though I see parquet-hive should be the one
which I have been using)
But getting exception while reading. File gets written without exception but
file size is 4 bytes. That means file is not being written. Code I have taken
from test classes from parquet-avro
Schema avroSchema = new Schema.Parser().parse(new
File("D:\\PB\\code\\bigdata\\hadoop2x\\src\\main\\resources\\array.avsc"));
System.out.println(avroSchema);
//generate the corresponding Parquet schema
MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
System.out.println(parquetSchema);
//create a WriteSupport object to serialize your Avro objects
AvroWriteSupport writeSupport = new AvroWriteSupport(parquetSchema,
avroSchema);
//choose compression scheme
CompressionCodecName compressionCodecName=CompressionCodecName.SNAPPY;
//set Parquet file block size and page size values
int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;
Path outputPath = new Path(
"hdfs://<ip>:9000/user/hdfs/testavro2");
AvroParquetWriter<GenericRecord> writer =
new AvroParquetWriter<GenericRecord>(outputPath, avroSchema);
List<Integer> emptyArray = new ArrayList<Integer>();
GenericData.Record record = new GenericRecordBuilder(avroSchema)
.set("myarray", emptyArray).build();
writer.write(record);
AvroParquetReader<GenericRecord> reader = new
AvroParquetReader<GenericRecord>(outputPath);
GenericRecord nextRecord = reader.read();
System.out.println(nextRecord.get("myarray"));
Exception:
{"type":"record","name":"myrecord","fields":[{"name":"myarray","type":{"type":"array","items":"int"}}]}
message myrecord {
required group myarray (LIST) {
repeated int32 array;
}
}
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more
info.
Exception in thread "main" Nov 5, 2015 12:46:51 PM INFO:
parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5
Nov 5, 2015 12:46:51 PM INFO: parquet.hadoop.ParquetFileReader: reading another
1 footers
Nov 5, 2015 12:46:51 PM INFO: parquet.hadoop.ParquetFileReader: Initiating
action with parallelism: 5
java.io.IOException: Could not read footer: java.lang.RuntimeException:
hdfs://152.144.226.102:9000/user/hdfs/testavro2 is not a Parquet file (too
small)
at
parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247)
at
parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:188)
at parquet.hadoop.ParquetReader.<init>(ParquetReader.java:114)
at parquet.hadoop.ParquetReader.<init>(ParquetReader.java:64)
at parquet.avro.AvroParquetReader.<init>(AvroParquetReader.java:44)
at TestAvroParquet.main(TestAvroParquet.java:44)
Caused by: java.lang.RuntimeException: hdfs://<ip>:9000/user/hdfs/testavro2 is
not a Parquet file (too small)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
-----Original Message-----
From: Manisha Sethi
Sent: Wednesday, November 04, 2015 10:41 PM
To: 'Ryan Blue' <[email protected]>; [email protected]
Subject: RE: Timestamp and time not being written Unsopported
Thanks a lot Ryan for the clarified response!!! :) I will surely figure out
the steps to start contributing :) And will try out parquet-avro.. :)
-----Original Message-----
From: Ryan Blue [mailto:[email protected]]
Sent: Wednesday, November 04, 2015 10:37 PM
To: Manisha Sethi <[email protected]>; [email protected]
Subject: Re: Timestamp and time not being written Unsopported
The problem you're hitting is that Hive doesn't support the int64 timestamps
that are in the Parquet spec. It supports an undocumented timestamp format
instead and we need to get it moved over to using the
int64 timestamps.
If you want to continue using Hive's Writable objects in your program and use
the int64 timestamp, then I recommend contributing support to Hive. It doesn't
look like it would be that hard, since you already understand the code path and
where this is going wrong.
I don't know if I would recommend Hive as an object model. I usually recommend
parquet-avro, since that enables you to use the same object model with both
Avro and Parquet file formats.
rb
On 11/03/2015 07:22 PM, Manisha Sethi wrote:
> Thanks Ryan!!
>
>
> I am trying to write in parquet format using hive-exec1.2.1 , this jar
> supports to write to ORC/Parquet file formats hive supports.
> My sample code looks like :
>
> MessageType schema = MessageTypeParser.parseMessageType("message
> basket { required int64 time (TIMESTAMP_MILLIS); }");
> Configuration config = new Configuration();
> config.set("fs.default.name", "hdfs://<ip>:9000");
> String outFilePath;
> Path outDirPath = new
> Path("hdfs://<ip>:9000/user/hdfs/test5");
>
> ParquetWriter writer = new ParquetWriter(outDirPath, new
> DataWritableWriteSupport()
> {
> private DataWritableWriter writer;
>
> @Override
> public WriteContext init(Configuration configuration)
> {
> if
> (configuration.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA) == null)
> {
>
> configuration.set(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA,
> schema.toString());
> }
> return super.init(configuration);
> }
>
> }, CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 100 *
> 1024, 100 * 1024, true, false, WriterVersion.PARQUET_2_0, config);
>
> List<ObjectInspector> list = new ArrayList<ObjectInspector>();
> ObjectInspector tins =
> ObjectInspectorFactory.getReflectionObjectInspector(Timestamp.class,
> ObjectInspectorOptions.JAVA);
> list.add(tins);
>
> List<String> columnnames = new ArrayList<String>();
> columnnames.add("time");
>
> StructObjectInspector inspec =
> ObjectInspectorFactory.getStandardStructObjectInspector(columnnames,
> list);
>
> List<Object> obj = new ArrayList<Object>();
> obj.add(new Timestamp(new Date().getTime()));
> writer.write(new ParquetHiveRecord(obj, inspec));
> writer.close();
>
> ====================================================================
>
> As timestamp is defined in parquet schema by annotating an "INT64"(which
> designates a long) so it instantiates a LongStatistics object used after
> writing values.
>
> Using code like(parquet.column.statistics.Statistics.java):
> public static Statistics getStatsBasedOnType(PrimitiveType.PrimitiveTypeName
> type)
> /* */ {
> /* 48 */ switch
> (1.$SwitchMap$parquet$schema$PrimitiveType$PrimitiveTypeName[type.ordinal()])
> {
> /* */ case 1:
> /* 50 */ return new IntStatistics();
> /* */ case 2:
> /* 52 */ return new LongStatistics();
> /* */ case 3:
> /* 54 */ return new FloatStatistics();
> /* */ case 4:
> /* 56 */ return new DoubleStatistics();
> /* */ case 5:
> /* 58 */ return new BooleanStatistics();
> /* */ case 6:
> /* 60 */ return new BinaryStatistics();
> /* */ case 7:
> /* 62 */ return new BinaryStatistics();
> /* */ case 8:
> /* 64 */ return new BinaryStatistics();
> /* */ }
> /* 66 */ throw new UnknownColumnTypeException(type);
> /* */ }
>
>
> But when Timestamp is written, below code is executed and written in binary
> form:
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.java
>
> Line 295: case TIMESTAMP:
> Timestamp ts = ((TimestampObjectInspector)
> inspector).getPrimitiveJavaObject(value);
> recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts,
> false).toBinary());
> break;
>
> Which leads to :
> Parquet.Column.Impl.ColumnWriterV2.java
> public void write(Binary value, int repetitionLevel, int definitionLevel)
> /* */ {
> /* 154 */ if (DEBUG) log(value, repetitionLevel, definitionLevel);
> /* 155 */ repetitionLevel(repetitionLevel);
> /* 156 */ definitionLevel(definitionLevel);
> /* 157 */ this.dataColumn.writeBytes(value);
> /* 158 */ this.statistics.updateStats(value);====>>>>FAILS
> /* 159 */ this.valueCount += 1;
> /* */ }
>
> Since statistics is LongStatistics instantiated and it has method defined
> which accepts long value, but call is made with binary argument, and call
> goes to Base Class Statistics.java where it throws unsupported exception.
>
> Complete StackTrace:
>
> Exception in thread "main" java.lang.RuntimeException: Parquet record is
> malformed: null
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
> at
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
> at parquet.hadoop.ParquetWriter.write(ParquetWriter.java:258)
> at ParquetTestWriter$1.run(ParquetTestWriter.java:126)
> at ParquetTestWriter$1.run(ParquetTestWriter.java:1)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at ParquetTestWriter.main(ParquetTestWriter.java:37)
> Caused by: java.lang.UnsupportedOperationException
> at
> parquet.column.statistics.Statistics.updateStats(Statistics.java:115)
> at parquet.column.impl.ColumnWriterV2.write(ColumnWriterV2.java:158)
> at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:346)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:297)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:106)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
> at
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
> ... 10 more
>
> Appreciate your help!
>
> -Manisha
>
> -----Original Message-----
> From: Ryan Blue [mailto:[email protected]]
> Sent: Tuesday, November 03, 2015 10:19 PM
> To: [email protected]
> Subject: Re: Timestamp and time not being written Unsopported
>
> On 11/03/2015 12:35 AM, Manisha Sethi wrote:
>> Hi
>>
>> I am trying to write timestamp using int64 (TIMESTAMP_MILLIS) via
>> ParquetWriter using jar hive-exec 1.2.1... But getting unsopprted
>> exception...
>> Issue is : when call reaches "add binary"
>>
>> break;
>> case BINARY:
>> byte[] vBinary = ((BinaryObjectInspector)
>> inspector).getPrimitiveJavaObject(value);
>> recordConsumer.addBinary(Binary.fromByteArray(vBinary));
>> break;
>> case TIMESTAMP:
>> Timestamp ts = ((TimestampObjectInspector)
>> inspector).getPrimitiveJavaObject(value);
>> recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts,
>> false).toBinary());
>> break;
>> case DECIMAL:
>> HiveDecimal vDecimal =
>> ((HiveDecimal)inspector.getPrimitiveJavaObject(value));
>> DecimalTypeInfo decTypeInfo =
>> (DecimalTypeInfo)inspector.getTypeInfo();
>> recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
>> break;
>>
>> Then in Columnwriter it fails at updatestatistics, since call is made
>> using longstatistic(corrs to its int64 data type but value is binary
>> which is not defined)
>>
>> this.repetitionLevelColumn.writeInteger(repetitionLevel);
>> /* 203 */ this.definitionLevelColumn.writeInteger(definitionLevel);
>> /* 204 */ this.dataColumn.writeBytes(value);
>> /* 205 */ updateStatistics(value);
>>
>> this.statistics.updateStats(value);====>>>> Method is not defined for
>> LongStatistics, hence throws unsupported exception
>>
>> ________________________________
>>
>>
>
> Manisha,
>
> Thanks for taking the time to e-mail about this. How are you writing this
> timestamp? Are you using Hive, or are you trying to use Hive's object model
> in your own code?
>
> Could you also send the stack trace that you're seeing? I'm confused about
> why the method would be undefined, since it should be defined for the types
> correctly.
>
> Thanks,
>
> rb
>
> --
> Ryan Blue
> Software Engineer
> Cloudera, Inc.
>
> ________________________________
>
--
Ryan Blue
Software Engineer
Cloudera, Inc.
________________________________