Hello,

I could not find a user mailing list, please direct me to that if that is more 
appropriate for this question.

I have a Map-Reduce action that is part of an Oozie workflow that reads Parquet 
files from HDFS.  The purpose of the action is to read each Parquet record and 
output them as ‘|’ delimitated text.  I am working with CDH 4, Parquet 1.6.0, 
and the old apache.mapred API.  Here is the error I am getting, it appears to 
be occurring as mappers attempt to read the Parquet records and convert them to 
Avro (as I am using AvroReadSupport):



2016-01-19 16:46:42,626 WARN org.apache.hadoop.mapred.Child: Error running child
parquet.io.ParquetDecodingException: Can not read value at 0 in block 0 in file 
hdfs://nameservice1/group/foam/prototypes/parquet/ingest/201511/H788798/LDR/part-00023-r-00023.snappy.parquet
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at 
parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:90)
at 
parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: org.apache.avro.AvroRuntimeException: Bad index: 2
at org.apache.avro.mapred.Pair.put(Pair.java:155)
at 
parquet.avro.AvroIndexedRecordConverter.set(AvroIndexedRecordConverter.java:116)
at 
parquet.avro.AvroIndexedRecordConverter.access$000(AvroIndexedRecordConverter.java:38)
at 
parquet.avro.AvroIndexedRecordConverter$1.add(AvroIndexedRecordConverter.java:76)
at 
parquet.avro.AvroIndexedRecordConverter$FieldStringConverter.addBinary(AvroIndexedRecordConverter.java:252)
at 
parquet.column.impl.ColumnReaderImpl$2$7.writeValue(ColumnReaderImpl.java:322)
at 
parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:369)
at 
parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:400)
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:173)


What is this “Pair” schema that the program is trying to put values into?  The 
schema of my data has 13 fields, how do I provide it to the InputFormat or 
ReadSupport class?
Also, should my mapper be written to work with Parquet Group values or Avro 
GenericRecord values?  Here is my Oozie config for the action:

    <action name="PrepareCdrExportMR">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete 
path="${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT"/>
            </prepare>
            <configuration>
            <property>
                <name>mapred.input.dir</name>
                
<value>/group/foam/prototypes/parquet/ingest/${release}/${group}/LDR</value>
            </property>

            <property>
                <name>mapred.output.dir</name>
                
<value>${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT</value>
            </property>


            <property>
                <name>mapred.mapper.class</name>
                <value>com.humedica.duplo.foam.CdrExportMapper</value>
            </property>

            <property>
                <name>mapred.reducer.class</name>
                <value>com.humedica.duplo.foam.CdrExportReducer</value>
            </property>

                <property>
                    <name>mapred.output.compress</name>
                    <value>false</value>
                </property>

                <property>
                    <name>mapred.output.compression.codec</name>
                    <value>org.apache.hadoop.io.compress.GzipCodec</value>
                </property>

                <property>
                    <name>mapred.input.format.class</name>
                    
<value>parquet.hadoop.mapred.DeprecatedParquetInputFormat</value>
                </property>

                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.hadoop.mapred.TextOutputFormat</value>
                </property>

                <property>
                    <name>mapred.textoutputformat.separator</name>
                    <value>|</value>
                </property>

                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>8</value>
                </property>

                <property>
                    <name>parquet.read.support.class</name>
                    <value>parquet.avro.AvroReadSupport</value>
                </property>

                <property>
                    <name>avro.schema</name>
                    <value>{
                        "namespace": "org.apache.avro.mapred",
                        "type": "record",
                        "name": "Foo",
                        "fields": [
                        {"name": "sourceid", "type": "string"},
                        {"name": "patientid", "type": "string"},
                        {"name": "encounterid", "type": "string"},
                        {"name": "encounterdate", "type": "string"},
                        {"name": "groupid", "type": "string"},
                        {"name": "cdsid", "type": "string"},
                        {"name": "seq", "type": "string"},
                        {"name": "section", "type": "string"},
                        {"name": "foam", "type": "string"},
                        {"name": "head", "type": "string"},
                        {"name": "body", "type": "string"},
                        {"name": "tail", "type": "string"},
                        {"name": "observationdate", "type": "string"},
                        {"name": "source", "type": "string"}
                        ]
                        }
                    </value>
                </property>

                <property>
                    <name>fs.hdfs.impl.disable.cache</name>
                    <value>true</value>
                </property>


                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.NullWritable</value>
                </property>

                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>

                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.NullWritable</value>
                </property>

                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>

                <property>
                    <name>io.serializations</name>
                    
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value>
                </property>
            </configuration>



        </map-reduce>
        <ok to="HdfsToLocal"/> <!-- To HdfsToLocal-->
        <error to="PrepareCdrExportFailureDBUpdate"/>
    </action>

Reply via email to