[ 
https://issues.apache.org/jira/browse/AVRO-1786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122312#comment-15122312
 ] 

Yong Zhang commented on AVRO-1786:
----------------------------------

Hi, Ryan:

First, I think this bug's priority should be Minor, as it is a vary rare case. 
So I changed it.

In this test, I changed the class of 
https://github.com/apache/hadoop/blob/release-2.2.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java,
 and add the following lines right after line 144:
if (key.toString().contains("xxxxx")) {
  System.out.println("current key length = " + (nextKey.getLength() - 
nextKey.getPosition()));
  System.out.println("current value length = " + (nextVal.getLength() - 
nextVal.getPosition()));
}

// in the above code "xxxx" is the key I know in the bad case, next line "value 
= valueDeserializer.deserialize(value)" will throw out exception. I want to 
check the value's length as bytes changed or not in both cases.

Anyway, here is the count in the good case, which means the mapper only omit 3 
records, include the "xxxxx" record:

16/01/28 14:03:58 INFO mapred.JobClient: Job complete: job_201512111403_1109
16/01/28 14:03:58 INFO mapred.JobClient: Counters: 31
16/01/28 14:03:58 INFO mapred.JobClient:   File System Counters
16/01/28 14:03:58 INFO mapred.JobClient:     FILE: BYTES_READ=524
16/01/28 14:03:58 INFO mapred.JobClient:     FILE: BYTES_WRITTEN=1888562
16/01/28 14:03:58 INFO mapred.JobClient:     HDFS: BYTES_READ=965292334
16/01/28 14:03:58 INFO mapred.JobClient:     HDFS: BYTES_WRITTEN=1508
16/01/28 14:03:58 INFO mapred.JobClient:   
org.apache.hadoop.mapreduce.JobCounter
16/01/28 14:03:58 INFO mapred.JobClient:     TOTAL_LAUNCHED_MAPS=9
16/01/28 14:03:58 INFO mapred.JobClient:     TOTAL_LAUNCHED_REDUCES=1
16/01/28 14:03:58 INFO mapred.JobClient:     DATA_LOCAL_MAPS=9
16/01/28 14:03:58 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=415253
16/01/28 14:03:58 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=24692
16/01/28 14:03:58 INFO mapred.JobClient:     FALLOW_SLOTS_MILLIS_MAPS=0
16/01/28 14:03:58 INFO mapred.JobClient:     FALLOW_SLOTS_MILLIS_REDUCES=0
16/01/28 14:03:58 INFO mapred.JobClient:   
org.apache.hadoop.mapreduce.TaskCounter
16/01/28 14:03:58 INFO mapred.JobClient:     MAP_INPUT_RECORDS=234962
16/01/28 14:03:58 INFO mapred.JobClient:     MAP_OUTPUT_RECORDS=3
16/01/28 14:03:58 INFO mapred.JobClient:     MAP_OUTPUT_BYTES=822
16/01/28 14:03:58 INFO mapred.JobClient:     MAP_OUTPUT_MATERIALIZED_BYTES=717
16/01/28 14:03:58 INFO mapred.JobClient:     SPLIT_RAW_BYTES=1461
16/01/28 14:03:58 INFO mapred.JobClient:     COMBINE_INPUT_RECORDS=0
16/01/28 14:03:58 INFO mapred.JobClient:     COMBINE_OUTPUT_RECORDS=0
16/01/28 14:03:58 INFO mapred.JobClient:     REDUCE_INPUT_GROUPS=3
16/01/28 14:03:58 INFO mapred.JobClient:     REDUCE_SHUFFLE_BYTES=717
16/01/28 14:03:58 INFO mapred.JobClient:     REDUCE_INPUT_RECORDS=3
16/01/28 14:03:58 INFO mapred.JobClient:     REDUCE_OUTPUT_RECORDS=3
16/01/28 14:03:58 INFO mapred.JobClient:     SPILLED_RECORDS=6
16/01/28 14:03:58 INFO mapred.JobClient:     CPU_MILLISECONDS=346620
16/01/28 14:03:58 INFO mapred.JobClient:     PHYSICAL_MEMORY_BYTES=10232893440
16/01/28 14:03:58 INFO mapred.JobClient:     VIRTUAL_MEMORY_BYTES=43124248576
16/01/28 14:03:58 INFO mapred.JobClient:     COMMITTED_HEAP_BYTES=13987872768
16/01/28 14:03:58 INFO mapred.JobClient:   ETLCounter$CounterType
16/01/28 14:03:58 INFO mapred.JobClient:     REDUCER_OUTPUT_RECORD=3
16/01/28 14:03:58 INFO mapred.JobClient:     VALID_RECORD=3
16/01/28 14:03:58 INFO mapred.JobClient:   File Input Format Counters
16/01/28 14:03:58 INFO mapred.JobClient:     Bytes Read=965224378
16/01/28 14:03:58 INFO mapred.JobClient:   
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat$Counter
16/01/28 14:03:58 INFO mapred.JobClient:     BYTES_WRITTEN=1508
16/01/28 14:03:58 INFO mapreduce.Contact2ETLDeltaDriver: Merge job done!

In this case, I got the following in the log:

current key length = 34
current value length = 402681

It looks like the value's length is 402681.

Then I run the same MR job again, but in this case, I am omitting all the 
records out, instead of just 3, and got the following in the log:

current key length = 34
current value length = 403167
Got ((n >>> 1) ^ -(n & 1)) = -51
And b = 101
And shift = 0
And n = 101

So this case, for the same avro record, shipped to reducer from the mapper, but 
the length of value changes to 403167, which will also cause the readInt method 
return "-51".

Even I am not 100% sure about how the intermediate data of MR job generated, 
but for the same AVRO record, should the value's length be the same in the 
ReduceContextImpl stage? Why the length will change? Is this correct?

> Strange IndexOutofBoundException in GenericDatumReader.readString
> -----------------------------------------------------------------
>
>                 Key: AVRO-1786
>                 URL: https://issues.apache.org/jira/browse/AVRO-1786
>             Project: Avro
>          Issue Type: Bug
>          Components: java
>    Affects Versions: 1.7.4, 1.7.7
>         Environment: CentOS 6.5 Linux x64, 2.6.32-358.14.1.el6.x86_64
> Use IBM JVM:
> IBM J9 VM (build 2.7, JRE 1.7.0 Linux amd64-64 Compressed References 
> 20140515_199835 (JIT enabled, AOT enabled)
>            Reporter: Yong Zhang
>            Priority: Minor
>
> Our production cluster is CENTOS 6.5 (2.6.32-358.14.1.el6.x86_64), running 
> IBM BigInsight V3.0.0.2. In Apache term, it is Hadoop 2.2.0 with MRV1(no 
> yarn), and comes with AVRO 1.7.4, running with IBM J9 VM (build 2.7, JRE 
> 1.7.0 Linux amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT 
> enabled). Not sure if the JDK matters, but it is NOT Oracle JVM.
> We have a ETL implemented in a chain of MR jobs. In one MR job, it is going 
> to merge 2 sets of AVRO data. Dataset1 is in HDFS location A, and Dataset2 is 
> in HDFS location B, and both contains the AVRO records binding to the same 
> AVRO schema. The record contains an unique id field, and a timestamp field. 
> The MR job is to merge the records based on the ID, and use the later 
> timestamp record to replace previous timestamp record, and omit the final 
> AVRO record out. Very straightforward.
> Now we faced a problem that one reducer keeps failing with the following 
> stacktrace on JobTracker:
> {code}
> java.lang.IndexOutOfBoundsException
>       at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
>       at java.io.DataInputStream.read(DataInputStream.java:160)
>       at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>       at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>       at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>       at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
>       at 
> org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>       at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
>       at 
> org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
>       at 
> org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
>       at 
> org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
>       at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
>       at 
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>       at 
> java.security.AccessController.doPrivileged(AccessController.java:366)
>       at javax.security.auth.Subject.doAs(Subject.java:572)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
>       at org.apache.hadoop.mapred.Child.main(Child.java:249)
> {code}
> Here is the my Mapper and Reducer methods:
> Mapper:
> public void map(AvroKey<SpecificRecord> key, NullWritable value, Context 
> context) throws IOException, InterruptedException 
> Reducer:
> protected void reduce(CustomPartitionKeyClass key, 
> Iterable<AvroValue<SpecificRecord>> values, Context context) throws 
> IOException, InterruptedException 
> What bother me are the following facts:
> 1) All the mappers finish without error
> 2) Most of the reducers finish without error, but one reducer keeps failing 
> with the above error.
> 3) It looks like caused by the data? But keep in mind that all the avro 
> records passed the mapper side, but failed in one reducer. 
> 4) From the stacktrace, it looks like our reducer code was NOT invoked yet, 
> but failed before that. So my guess is that all the AVRO records pass through 
> the mapper side, but AVRO complains the intermediate result generated by the 
> one mapper? In my understanding, that will be a Sequence file generated by 
> Hadoop, and value part will be the AVRO bytes. Is the above error meaning 
> that AVRO cannot deserialize the value part from the sequence file?
> 5) Our ETL run fine for more than one year, but suddenly got this error 
> starting from one day, and kept getting this problem after that. 
> 6) If it helps, here is the schema for the avro record:
> {code}
> {
>     "namespace" : "company name",
>     "type" : "record",
>     "name" : "Lists",
>     "fields" : [
>         {"name" : "account_id", "type" : "long"},
>         {"name" : "list_id", "type" : "string"},
>         {"name" : "sequence_id", "type" : ["int", "null"]} ,
>         {"name" : "name", "type" : ["string", "null"]},
>         {"name" : "state", "type" : ["string", "null"]},
>         {"name" : "description", "type" : ["string", "null"]},
>         {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
>         {"name" : "filter_criteria", "type" : ["string", "null"]},
>         {"name" : "created_at", "type" : ["long", "null"]},
>         {"name" : "updated_at", "type" : ["long", "null"]},
>         {"name" : "deleted_at", "type" : ["long", "null"]},
>         {"name" : "favorite", "type" : ["int", "null"]},
>         {"name" : "delta", "type" : ["boolean", "null"]},
>         {
>             "name" : "list_memberships", "type" : {
>                 "type" : "array", "items" : {
>                     "name" : "ListMembership", "type" : "record",
>                     "fields" : [
>                         {"name" : "channel_id", "type" : "string"},
>                         {"name" : "created_at", "type" : ["long", "null"]},
>                         {"name" : "created_source", "type" : ["string", 
> "null"]},
>                         {"name" : "deleted_at", "type" : ["long", "null"]},
>                         {"name" : "sequence_id", "type" : ["long", "null"]}
>                     ]
>                 }
>             }
>         }
>     ]
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to