[ 
https://issues.apache.org/jira/browse/AVRO-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yong Zhang updated AVRO-1786:
-----------------------------
    Description: 
Our production cluster is CENTOS 6.5 (2.6.32-358.14.1.el6.x86_64), running IBM 
BigInsight V3.0.0.2. In Apache term, it is Hadoop 2.2.0 with MRV1(no yarn), and 
comes with AVRO 1.7.4, running with IBM J9 VM (build 2.7, JRE 1.7.0 Linux 
amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT enabled). Not 
sure if the JDK matters, but it is NOT Oracle JVM.

We have a ETL implemented in a chain of MR jobs. In one MR job, it is going to 
merge 2 sets of AVRO data. Dataset1 is in HDFS location A, and Dataset2 is in 
HDFS location B, and both contains the AVRO records binding to the same AVRO 
schema. The record contains an unique id field, and a timestamp field. The MR 
job is to merge the records based on the ID, and use the later timestamp record 
to replace previous timestamp record, and omit the final AVRO record out. Very 
straightforward.

Now we faced a problem that one reducer keeps failing with the following 
stacktrace on JobTracker:

java.lang.IndexOutOfBoundsException
        at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
        at java.io.DataInputStream.read(DataInputStream.java:160)
        at 
org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at 
org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
        at 
org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
        at 
org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
        at 
org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
        at 
org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
        at 
org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
        at 
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at 
java.security.AccessController.doPrivileged(AccessController.java:366)
        at javax.security.auth.Subject.doAs(Subject.java:572)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

Here is the my Mapper and Reducer methods:
Mapper:
public void map(AvroKey<SpecificRecord> key, NullWritable value, Context 
context) throws IOException, InterruptedException 
Reducer:
protected void reduce(CustomPartitionKeyClass key, 
Iterable<AvroValue<SpecificRecord>> values, Context context) throws 
IOException, InterruptedException 

What bother me are the following facts:
1) All the mappers finish without error
2) Most of the reducers finish without error, but one reducer keeps failing 
with the above error.
3) It looks like caused by the data? But keep in mind that all the avro records 
passed the mapper side, but failed in one reducer. 
4) From the stacktrace, it looks like our reducer code was NOT invoked yet, but 
failed before that. So my guess is that all the AVRO records pass through the 
mapper side, but AVRO complains the intermediate result generated by the one 
mapper? In my understanding, that will be a Sequence file generated by Hadoop, 
and value part will be the AVRO bytes. Is the above error meaning that AVRO 
cannot deserialize the value part from the sequence file?
5) Our ETL run fine for more than one year, but suddenly got this error 
starting from one day, and kept getting this problem after that. 
6) If it helps, here is the schema for the avro record:
{
    "namespace" : "company name",
    "type" : "record",
    "name" : "Lists",
    "fields" : [
        {"name" : "account_id", "type" : "long"},
        {"name" : "list_id", "type" : "string"},
        {"name" : "sequence_id", "type" : ["int", "null"]} ,
        {"name" : "name", "type" : ["string", "null"]},
        {"name" : "state", "type" : ["string", "null"]},
        {"name" : "description", "type" : ["string", "null"]},
        {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
        {"name" : "filter_criteria", "type" : ["string", "null"]},
        {"name" : "created_at", "type" : ["long", "null"]},
        {"name" : "updated_at", "type" : ["long", "null"]},
        {"name" : "deleted_at", "type" : ["long", "null"]},
        {"name" : "favorite", "type" : ["int", "null"]},
        {"name" : "delta", "type" : ["boolean", "null"]},
        {
            "name" : "list_memberships", "type" : {
                "type" : "array", "items" : {
                    "name" : "ListMembership", "type" : "record",
                    "fields" : [
                        {"name" : "channel_id", "type" : "string"},
                        {"name" : "created_at", "type" : ["long", "null"]},
                        {"name" : "created_source", "type" : ["string", 
"null"]},
                        {"name" : "deleted_at", "type" : ["long", "null"]},
                        {"name" : "sequence_id", "type" : ["long", "null"]}
                    ]
                }
            }
        }
    ]
}

  was:
We are using the AVRO 1.7.4 comes with IBM BigInsight V3.0.0.2, which also 
includes Apache Hadoop 2.2.0 and AVRO 1.7.4

There is a dataset stored in AVRO format, and also there is a daily delta 
changes of this dataset in CSV format ingested in HDFS. The ETL will transform 
the CSV format into AVRO format in the 1st MR job, then try to merge the new 
AVRO records with existing AVRO records in a 2nd MR job, and final output will 
be a new merged dataset, store in avro format in HDFS and ready to use.

This works for over a year, until suddenly, we face a strange exception throws 
out from the 2nd MR job. Here is the stacktrace in the MR job:

java.lang.IndexOutOfBoundsException
        at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
        at java.io.DataInputStream.read(DataInputStream.java:160)
        at 
org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at 
org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
        at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
        at 
org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
        at 
org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
        at 
org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
        at 
org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
        at 
org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
        at 
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at 
java.security.AccessController.doPrivileged(AccessController.java:366)
        at javax.security.auth.Subject.doAs(Subject.java:572)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

It throws from the reducer job. In this case, the input of the MR job in fact 
contains 2 parts, one is the new AVRO records generated from the delta; the 
other one is the existing latest AVRO records before the delta. Our reducer 
code in fact doesn't look like be invoked yet, but AVRO failed to read the avro 
records in this case, and I am not 100% sure which part of the above 2 it 
complains.

When I first faced this error, I thought it is the new delta data contain some 
data causing this issue, but when I test different cases, it is more like that 
AVRO cannot read the data generated by itself. The stack trace itself looks 
like the final AVRO data is corrupted, but I don't know why and how.
My test cases like following:

1) Day1 data + day2 delta -> merged without issue, and generate day2 data
2) Day2 data cannot merge day3 delta -> got the exception in MR job
3) day1 data + day2 delta + day3 delta -> merged without issue, generated day3 
data, but after that, it cannot merge with new incoming delta.

I tried to use AVRO 1.7.7, which looks like the latest so far, but I still get 
this error. I believe some data may trigger this bug, but the data in CSV file 
all looks fine.

here is the avro schema file we defined for this dataset:

{
    "namespace" : "xxxxxx",
    "type" : "record",
    "name" : "Lists",
    "fields" : [
        {"name" : "account_id", "type" : "long"},
        {"name" : "list_id", "type" : "string"},
        {"name" : "sequence_id", "type" : ["int", "null"]} ,
        {"name" : "name", "type" : ["string", "null"]},
        {"name" : "state", "type" : ["string", "null"]},
        {"name" : "description", "type" : ["string", "null"]},
        {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
        {"name" : "filter_criteria", "type" : ["string", "null"]},
        {"name" : "created_at", "type" : ["long", "null"]},
        {"name" : "updated_at", "type" : ["long", "null"]},
        {"name" : "deleted_at", "type" : ["long", "null"]},
        {"name" : "favorite", "type" : ["int", "null"]},
        {"name" : "delta", "type" : ["boolean", "null"]},
        {
            "name" : "list_memberships", "type" : {
                "type" : "array", "items" : {
                    "name" : "ListMembership", "type" : "record",
                    "fields" : [
                        {"name" : "channel_id", "type" : "string"},
                        {"name" : "created_at", "type" : ["long", "null"]},
                        {"name" : "created_source", "type" : ["string", 
"null"]},
                        {"name" : "deleted_at", "type" : ["long", "null"]},
                        {"name" : "sequence_id", "type" : ["long", "null"]}
                    ]
                }
            }
        }
    ]
}


> Strange IndexOutofBoundException in GenericDatumReader.readString
> -----------------------------------------------------------------
>
>                 Key: AVRO-1786
>                 URL: https://issues.apache.org/jira/browse/AVRO-1786
>             Project: Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.7.4
>         Environment: CentOS 6.5 Linux x64, 2.6.32-358.14.1.el6.x86_64
> Use IBM JVM:
> IBM J9 VM (build 2.7, JRE 1.7.0 Linux amd64-64 Compressed References 
> 20140515_199835 (JIT enabled, AOT enabled)
>            Reporter: Yong Zhang
>
> Our production cluster is CENTOS 6.5 (2.6.32-358.14.1.el6.x86_64), running 
> IBM BigInsight V3.0.0.2. In Apache term, it is Hadoop 2.2.0 with MRV1(no 
> yarn), and comes with AVRO 1.7.4, running with IBM J9 VM (build 2.7, JRE 
> 1.7.0 Linux amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT 
> enabled). Not sure if the JDK matters, but it is NOT Oracle JVM.
> We have a ETL implemented in a chain of MR jobs. In one MR job, it is going 
> to merge 2 sets of AVRO data. Dataset1 is in HDFS location A, and Dataset2 is 
> in HDFS location B, and both contains the AVRO records binding to the same 
> AVRO schema. The record contains an unique id field, and a timestamp field. 
> The MR job is to merge the records based on the ID, and use the later 
> timestamp record to replace previous timestamp record, and omit the final 
> AVRO record out. Very straightforward.
> Now we faced a problem that one reducer keeps failing with the following 
> stacktrace on JobTracker:
> java.lang.IndexOutOfBoundsException
>       at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
>       at java.io.DataInputStream.read(DataInputStream.java:160)
>       at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>       at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>       at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>       at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>       at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
>       at 
> org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
>       at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
>       at 
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>       at 
> java.security.AccessController.doPrivileged(AccessController.java:366)
>       at javax.security.auth.Subject.doAs(Subject.java:572)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
>       at org.apache.hadoop.mapred.Child.main(Child.java:249)
> Here is the my Mapper and Reducer methods:
> Mapper:
> public void map(AvroKey<SpecificRecord> key, NullWritable value, Context 
> context) throws IOException, InterruptedException 
> Reducer:
> protected void reduce(CustomPartitionKeyClass key, 
> Iterable<AvroValue<SpecificRecord>> values, Context context) throws 
> IOException, InterruptedException 
> What bother me are the following facts:
> 1) All the mappers finish without error
> 2) Most of the reducers finish without error, but one reducer keeps failing 
> with the above error.
> 3) It looks like caused by the data? But keep in mind that all the avro 
> records passed the mapper side, but failed in one reducer. 
> 4) From the stacktrace, it looks like our reducer code was NOT invoked yet, 
> but failed before that. So my guess is that all the AVRO records pass through 
> the mapper side, but AVRO complains the intermediate result generated by the 
> one mapper? In my understanding, that will be a Sequence file generated by 
> Hadoop, and value part will be the AVRO bytes. Is the above error meaning 
> that AVRO cannot deserialize the value part from the sequence file?
> 5) Our ETL run fine for more than one year, but suddenly got this error 
> starting from one day, and kept getting this problem after that. 
> 6) If it helps, here is the schema for the avro record:
> {
>     "namespace" : "company name",
>     "type" : "record",
>     "name" : "Lists",
>     "fields" : [
>         {"name" : "account_id", "type" : "long"},
>         {"name" : "list_id", "type" : "string"},
>         {"name" : "sequence_id", "type" : ["int", "null"]} ,
>         {"name" : "name", "type" : ["string", "null"]},
>         {"name" : "state", "type" : ["string", "null"]},
>         {"name" : "description", "type" : ["string", "null"]},
>         {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
>         {"name" : "filter_criteria", "type" : ["string", "null"]},
>         {"name" : "created_at", "type" : ["long", "null"]},
>         {"name" : "updated_at", "type" : ["long", "null"]},
>         {"name" : "deleted_at", "type" : ["long", "null"]},
>         {"name" : "favorite", "type" : ["int", "null"]},
>         {"name" : "delta", "type" : ["boolean", "null"]},
>         {
>             "name" : "list_memberships", "type" : {
>                 "type" : "array", "items" : {
>                     "name" : "ListMembership", "type" : "record",
>                     "fields" : [
>                         {"name" : "channel_id", "type" : "string"},
>                         {"name" : "created_at", "type" : ["long", "null"]},
>                         {"name" : "created_source", "type" : ["string", 
> "null"]},
>                         {"name" : "deleted_at", "type" : ["long", "null"]},
>                         {"name" : "sequence_id", "type" : ["long", "null"]}
>                     ]
>                 }
>             }
>         }
>     ]
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to