[ 
https://issues.apache.org/jira/browse/AVRO-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116151#comment-15116151
 ] 

Yong Zhang commented on AVRO-1786:
----------------------------------

Hi, Ryan:

I listed the Mapper and Reducer method signature in the ticket, and just listed 
here briefly for more explaining:

Mapper side
public void map(AvroKey<SpecificRecord> key, NullWritable value, Context 
context) throws IOException, InterruptedException {
   SpecificRecord specificRecord = key.datum();
   context.write(outputKey, new AvroValue(lists));
}
// So we know that the mapper doesn't complain anything about the Avor record

    @Override
    protected void reduce(PartitionKey key, Iterable<AvroValue<SpecificRecord>> 
values, Context context) throws IOException, InterruptedException {
    }
// The PartitionKey object is a customer class to partition the value based on 
the ID, and also do the 2nd sort, to sort the data based on the timestamp.
// I know the key part is fine, as the KeyDeserializer in the ReduceContextImpl 
works fine.

The problem happened in the ValueDeserializer.deserialize(value), which indeed 
throws exception from AVRO codebase. I even hack the hadoop code to dump the 
key when the exception happens.
Yes, I understand the exception is for the intermediate data, but I don't know 
why in this case AVRO treat this intermediate data as invalid AVRO data. In 
this case, most of the data are fine, as most reducers finished successfully, 
but from the dump, I can see at least 3 records throw exception in this case, 
then I stopped dumping more exception records.
Our schema may change before (I will check that), but the new schema should be 
always compatible with the old data. For example, if the trouble data is in 
fact generated by old schema, it is still read fine in the Hive (which is 
defined as the latest schema) for these 3 records.

What I am going to do next is:

1) I will dump one good AVRO record + one bad AVRO record (I had the 3 ids) 
out, and just store them in a different AVRO file, then to see if I can still 
reproduce this problem, and then compare what is the difference between these 2 
records.
2) I will try to check the MR counter during this test.


> Strange IndexOutofBoundException in GenericDatumReader.readString
> -----------------------------------------------------------------
>
>                 Key: AVRO-1786
>                 URL: https://issues.apache.org/jira/browse/AVRO-1786
>             Project: Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.7.4, 1.7.7
>         Environment: CentOS 6.5 Linux x64, 2.6.32-358.14.1.el6.x86_64
> Use IBM JVM:
> IBM J9 VM (build 2.7, JRE 1.7.0 Linux amd64-64 Compressed References 
> 20140515_199835 (JIT enabled, AOT enabled)
>            Reporter: Yong Zhang
>
> Our production cluster is CENTOS 6.5 (2.6.32-358.14.1.el6.x86_64), running 
> IBM BigInsight V3.0.0.2. In Apache term, it is Hadoop 2.2.0 with MRV1(no 
> yarn), and comes with AVRO 1.7.4, running with IBM J9 VM (build 2.7, JRE 
> 1.7.0 Linux amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT 
> enabled). Not sure if the JDK matters, but it is NOT Oracle JVM.
> We have a ETL implemented in a chain of MR jobs. In one MR job, it is going 
> to merge 2 sets of AVRO data. Dataset1 is in HDFS location A, and Dataset2 is 
> in HDFS location B, and both contains the AVRO records binding to the same 
> AVRO schema. The record contains an unique id field, and a timestamp field. 
> The MR job is to merge the records based on the ID, and use the later 
> timestamp record to replace previous timestamp record, and omit the final 
> AVRO record out. Very straightforward.
> Now we faced a problem that one reducer keeps failing with the following 
> stacktrace on JobTracker:
> {code}
> java.lang.IndexOutOfBoundsException
>       at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
>       at java.io.DataInputStream.read(DataInputStream.java:160)
>       at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>       at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>       at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>       at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>       at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
>       at 
> org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
>       at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
>       at 
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>       at 
> java.security.AccessController.doPrivileged(AccessController.java:366)
>       at javax.security.auth.Subject.doAs(Subject.java:572)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
>       at org.apache.hadoop.mapred.Child.main(Child.java:249)
> {code}
> Here is the my Mapper and Reducer methods:
> Mapper:
> public void map(AvroKey<SpecificRecord> key, NullWritable value, Context 
> context) throws IOException, InterruptedException 
> Reducer:
> protected void reduce(CustomPartitionKeyClass key, 
> Iterable<AvroValue<SpecificRecord>> values, Context context) throws 
> IOException, InterruptedException 
> What bother me are the following facts:
> 1) All the mappers finish without error
> 2) Most of the reducers finish without error, but one reducer keeps failing 
> with the above error.
> 3) It looks like caused by the data? But keep in mind that all the avro 
> records passed the mapper side, but failed in one reducer. 
> 4) From the stacktrace, it looks like our reducer code was NOT invoked yet, 
> but failed before that. So my guess is that all the AVRO records pass through 
> the mapper side, but AVRO complains the intermediate result generated by the 
> one mapper? In my understanding, that will be a Sequence file generated by 
> Hadoop, and value part will be the AVRO bytes. Is the above error meaning 
> that AVRO cannot deserialize the value part from the sequence file?
> 5) Our ETL run fine for more than one year, but suddenly got this error 
> starting from one day, and kept getting this problem after that. 
> 6) If it helps, here is the schema for the avro record:
> {code}
> {
>     "namespace" : "company name",
>     "type" : "record",
>     "name" : "Lists",
>     "fields" : [
>         {"name" : "account_id", "type" : "long"},
>         {"name" : "list_id", "type" : "string"},
>         {"name" : "sequence_id", "type" : ["int", "null"]} ,
>         {"name" : "name", "type" : ["string", "null"]},
>         {"name" : "state", "type" : ["string", "null"]},
>         {"name" : "description", "type" : ["string", "null"]},
>         {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
>         {"name" : "filter_criteria", "type" : ["string", "null"]},
>         {"name" : "created_at", "type" : ["long", "null"]},
>         {"name" : "updated_at", "type" : ["long", "null"]},
>         {"name" : "deleted_at", "type" : ["long", "null"]},
>         {"name" : "favorite", "type" : ["int", "null"]},
>         {"name" : "delta", "type" : ["boolean", "null"]},
>         {
>             "name" : "list_memberships", "type" : {
>                 "type" : "array", "items" : {
>                     "name" : "ListMembership", "type" : "record",
>                     "fields" : [
>                         {"name" : "channel_id", "type" : "string"},
>                         {"name" : "created_at", "type" : ["long", "null"]},
>                         {"name" : "created_source", "type" : ["string", 
> "null"]},
>                         {"name" : "deleted_at", "type" : ["long", "null"]},
>                         {"name" : "sequence_id", "type" : ["long", "null"]}
>                     ]
>                 }
>             }
>         }
>     ]
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to