This seems an intermittent issue, even it starts appearing when I am closing writer.
-----Original Message----- From: Ryan Blue [mailto:[email protected]] Sent: Thursday, November 05, 2015 10:38 PM To: Manisha Sethi <[email protected]>; [email protected] Subject: Re: Timestamp and time not being written Unsopported It looks like you need to close the Parquet file before you read it. rb On 11/04/2015 11:35 PM, Manisha Sethi wrote: > Hi Ryan , > > I am using parquet-avro as of now(though I see parquet-hive should be > the one which I have been using) But getting exception while reading. > File gets written without exception but file size is 4 bytes. That > means file is not being written. Code I have taken from test classes > from parquet-avro > > Schema avroSchema = new Schema.Parser().parse(new > File("D:\\PB\\code\\bigdata\\hadoop2x\\src\\main\\resources\\array.avs > c")); > System.out.println(avroSchema); > //generate the corresponding Parquet schema > MessageType parquetSchema = new > AvroSchemaConverter().convert(avroSchema); > System.out.println(parquetSchema); > //create a WriteSupport object to serialize your Avro objects > AvroWriteSupport writeSupport = new AvroWriteSupport(parquetSchema, > avroSchema); > > //choose compression scheme > CompressionCodecName > compressionCodecName=CompressionCodecName.SNAPPY; > > //set Parquet file block size and page size values > int blockSize = 256 * 1024 * 1024; > int pageSize = 64 * 1024; > Path outputPath = new Path( > "hdfs://<ip>:9000/user/hdfs/testavro2"); > AvroParquetWriter<GenericRecord> writer = > new AvroParquetWriter<GenericRecord>(outputPath, avroSchema); > List<Integer> emptyArray = new ArrayList<Integer>(); > GenericData.Record record = new GenericRecordBuilder(avroSchema) > .set("myarray", emptyArray).build(); > writer.write(record); > AvroParquetReader<GenericRecord> reader = new > AvroParquetReader<GenericRecord>(outputPath); > GenericRecord nextRecord = reader.read(); > > System.out.println(nextRecord.get("myarray")); > > > > Exception: > {"type":"record","name":"myrecord","fields":[{"name":"myarray","type": > {"type":"array","items":"int"}}]} > message myrecord { > required group myarray (LIST) { > repeated int32 array; > } > } > > log4j:WARN No appenders could be found for logger > (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > Exception in thread "main" Nov 5, 2015 12:46:51 PM INFO: > parquet.hadoop.ParquetFileReader: Initiating action with parallelism: > 5 Nov 5, 2015 12:46:51 PM INFO: parquet.hadoop.ParquetFileReader: > reading another 1 footers Nov 5, 2015 12:46:51 PM INFO: > parquet.hadoop.ParquetFileReader: Initiating action with parallelism: > 5 > java.io.IOException: Could not read footer: java.lang.RuntimeException: > hdfs://152.144.226.102:9000/user/hdfs/testavro2 is not a Parquet file (too > small) > at > parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247) > at > parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:188) > at parquet.hadoop.ParquetReader.<init>(ParquetReader.java:114) > at parquet.hadoop.ParquetReader.<init>(ParquetReader.java:64) > at parquet.avro.AvroParquetReader.<init>(AvroParquetReader.java:44) > at TestAvroParquet.main(TestAvroParquet.java:44) > Caused by: java.lang.RuntimeException: hdfs://<ip>:9000/user/hdfs/testavro2 > is not a Parquet file (too small) > at > parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) > at > parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237) > at > parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > -----Original Message----- > From: Manisha Sethi > Sent: Wednesday, November 04, 2015 10:41 PM > To: 'Ryan Blue' <[email protected]>; [email protected] > Subject: RE: Timestamp and time not being written Unsopported > > Thanks a lot Ryan for the clarified response!!! :) I will surely > figure out the steps to start contributing :) And will try out > parquet-avro.. :) > > -----Original Message----- > From: Ryan Blue [mailto:[email protected]] > Sent: Wednesday, November 04, 2015 10:37 PM > To: Manisha Sethi <[email protected]>; [email protected] > Subject: Re: Timestamp and time not being written Unsopported > > The problem you're hitting is that Hive doesn't support the int64 > timestamps that are in the Parquet spec. It supports an undocumented > timestamp format instead and we need to get it moved over to using the > int64 timestamps. > > If you want to continue using Hive's Writable objects in your program and use > the int64 timestamp, then I recommend contributing support to Hive. It > doesn't look like it would be that hard, since you already understand the > code path and where this is going wrong. > > I don't know if I would recommend Hive as an object model. I usually > recommend parquet-avro, since that enables you to use the same object model > with both Avro and Parquet file formats. > > rb > > On 11/03/2015 07:22 PM, Manisha Sethi wrote: >> Thanks Ryan!! >> >> >> I am trying to write in parquet format using hive-exec1.2.1 , this jar >> supports to write to ORC/Parquet file formats hive supports. >> My sample code looks like : >> >> MessageType schema = MessageTypeParser.parseMessageType("message >> basket { required int64 time (TIMESTAMP_MILLIS); }"); >> Configuration config = new Configuration(); >> config.set("fs.default.name", "hdfs://<ip>:9000"); >> String outFilePath; >> Path outDirPath = new >> Path("hdfs://<ip>:9000/user/hdfs/test5"); >> >> ParquetWriter writer = new ParquetWriter(outDirPath, new >> DataWritableWriteSupport() >> { >> private DataWritableWriter writer; >> >> @Override >> public WriteContext init(Configuration configuration) >> { >> if >> (configuration.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA) == null) >> { >> >> configuration.set(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, >> schema.toString()); >> } >> return super.init(configuration); >> } >> >> }, CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 100 * >> 1024, 100 * 1024, true, false, WriterVersion.PARQUET_2_0, config); >> >> List<ObjectInspector> list = new ArrayList<ObjectInspector>(); >> ObjectInspector tins = >> ObjectInspectorFactory.getReflectionObjectInspector(Timestamp.class, >> ObjectInspectorOptions.JAVA); >> list.add(tins); >> >> List<String> columnnames = new ArrayList<String>(); >> columnnames.add("time"); >> >> StructObjectInspector inspec = >> ObjectInspectorFactory.getStandardStructObjectInspector(columnnames, >> list); >> >> List<Object> obj = new ArrayList<Object>(); >> obj.add(new Timestamp(new Date().getTime())); >> writer.write(new ParquetHiveRecord(obj, inspec)); >> writer.close(); >> >> ==================================================================== >> >> As timestamp is defined in parquet schema by annotating an "INT64"(which >> designates a long) so it instantiates a LongStatistics object used after >> writing values. >> >> Using code like(parquet.column.statistics.Statistics.java): >> public static Statistics getStatsBasedOnType(PrimitiveType.PrimitiveTypeName >> type) >> /* */ { >> /* 48 */ switch >> (1.$SwitchMap$parquet$schema$PrimitiveType$PrimitiveTypeName[type.ordinal()]) >> { >> /* */ case 1: >> /* 50 */ return new IntStatistics(); >> /* */ case 2: >> /* 52 */ return new LongStatistics(); >> /* */ case 3: >> /* 54 */ return new FloatStatistics(); >> /* */ case 4: >> /* 56 */ return new DoubleStatistics(); >> /* */ case 5: >> /* 58 */ return new BooleanStatistics(); >> /* */ case 6: >> /* 60 */ return new BinaryStatistics(); >> /* */ case 7: >> /* 62 */ return new BinaryStatistics(); >> /* */ case 8: >> /* 64 */ return new BinaryStatistics(); >> /* */ } >> /* 66 */ throw new UnknownColumnTypeException(type); >> /* */ } >> >> >> But when Timestamp is written, below code is executed and written in binary >> form: >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.java >> >> Line 295: case TIMESTAMP: >> Timestamp ts = ((TimestampObjectInspector) >> inspector).getPrimitiveJavaObject(value); >> recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, >> false).toBinary()); >> break; >> >> Which leads to : >> Parquet.Column.Impl.ColumnWriterV2.java >> public void write(Binary value, int repetitionLevel, int definitionLevel) >> /* */ { >> /* 154 */ if (DEBUG) log(value, repetitionLevel, definitionLevel); >> /* 155 */ repetitionLevel(repetitionLevel); >> /* 156 */ definitionLevel(definitionLevel); >> /* 157 */ this.dataColumn.writeBytes(value); >> /* 158 */ this.statistics.updateStats(value);====>>>>FAILS >> /* 159 */ this.valueCount += 1; >> /* */ } >> >> Since statistics is LongStatistics instantiated and it has method defined >> which accepts long value, but call is made with binary argument, and call >> goes to Base Class Statistics.java where it throws unsupported exception. >> >> Complete StackTrace: >> >> Exception in thread "main" java.lang.RuntimeException: Parquet record is >> malformed: null >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) >> at >> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121) >> at parquet.hadoop.ParquetWriter.write(ParquetWriter.java:258) >> at ParquetTestWriter$1.run(ParquetTestWriter.java:126) >> at ParquetTestWriter$1.run(ParquetTestWriter.java:1) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) >> at ParquetTestWriter.main(ParquetTestWriter.java:37) >> Caused by: java.lang.UnsupportedOperationException >> at >> parquet.column.statistics.Statistics.updateStats(Statistics.java:115) >> at >> parquet.column.impl.ColumnWriterV2.write(ColumnWriterV2.java:158) >> at >> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:346) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:297) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:106) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89) >> at >> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60) >> ... 10 more >> >> Appreciate your help! >> >> -Manisha >> >> -----Original Message----- >> From: Ryan Blue [mailto:[email protected]] >> Sent: Tuesday, November 03, 2015 10:19 PM >> To: [email protected] >> Subject: Re: Timestamp and time not being written Unsopported >> >> On 11/03/2015 12:35 AM, Manisha Sethi wrote: >>> Hi >>> >>> I am trying to write timestamp using int64 (TIMESTAMP_MILLIS) via >>> ParquetWriter using jar hive-exec 1.2.1... But getting unsopprted >>> exception... >>> Issue is : when call reaches "add binary" >>> >>> break; >>> case BINARY: >>> byte[] vBinary = ((BinaryObjectInspector) >>> inspector).getPrimitiveJavaObject(value); >>> recordConsumer.addBinary(Binary.fromByteArray(vBinary)); >>> break; >>> case TIMESTAMP: >>> Timestamp ts = ((TimestampObjectInspector) >>> inspector).getPrimitiveJavaObject(value); >>> recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, >>> false).toBinary()); >>> break; >>> case DECIMAL: >>> HiveDecimal vDecimal = >>> ((HiveDecimal)inspector.getPrimitiveJavaObject(value)); >>> DecimalTypeInfo decTypeInfo = >>> (DecimalTypeInfo)inspector.getTypeInfo(); >>> recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); >>> break; >>> >>> Then in Columnwriter it fails at updatestatistics, since call is >>> made using longstatistic(corrs to its int64 data type but value is >>> binary which is not defined) >>> >>> this.repetitionLevelColumn.writeInteger(repetitionLevel); >>> /* 203 */ this.definitionLevelColumn.writeInteger(definitionLevel); >>> /* 204 */ this.dataColumn.writeBytes(value); >>> /* 205 */ updateStatistics(value); >>> >>> this.statistics.updateStats(value);====>>>> Method is not defined >>> for LongStatistics, hence throws unsupported exception >>> >>> ________________________________ >>> >>> >> >> Manisha, >> >> Thanks for taking the time to e-mail about this. How are you writing this >> timestamp? Are you using Hive, or are you trying to use Hive's object model >> in your own code? >> >> Could you also send the stack trace that you're seeing? I'm confused about >> why the method would be undefined, since it should be defined for the types >> correctly. >> >> Thanks, >> >> rb >> >> -- >> Ryan Blue >> Software Engineer >> Cloudera, Inc. >> >> ________________________________ >> > > > -- > Ryan Blue > Software Engineer > Cloudera, Inc. > > ________________________________ > -- Ryan Blue Software Engineer Cloudera, Inc. ________________________________
