[ 
https://issues.apache.org/jira/browse/CRUNCH-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13859749#comment-13859749
 ] 

Micah Whitacre commented on CRUNCH-316:
---------------------------------------

Would reusing the BinaryEncoder (e.g. BufferedBinaryEncoder) also be an option? 
 If in open we created the EncoderFactory but got a new Encoder on each call to 
serialize it looks like the BufferedBinaryEncoder would be reconfigured and 
therefore flushed[1] (note haven't tested out)

[1] - 
https://github.com/apache/avro/blob/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java#L58

> Data Corruption when DatumWriter.write() throws MapBufferTooSmallException 
> when called by SafeAvroSerialization
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-316
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-316
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>            Reporter: Ben Roling
>            Assignee: Josh Wills
>         Attachments: ArrayIndexOutOfBoundsException.txt
>
>
> Recently we encountered an issue when processing a crunch join with a large 
> Avro record.  The job was failing in the reduce phase with the attached 
> ArrayIndexOutOfBoundsException deserializing an Avro record.
> One of the first things I noticed when looking into the problem was the 
> following message:
> 2013-12-31 10:33:02,489 INFO  [pool-1-thread-1] 
> org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer: 
> 99615133 bytes
> The message indicates a record is too large to fit in the sort buffer (per 
> io.sort.mb -- which defaults to 100MB).  I increased io.sort.mb and the 
> problem went away, but I was curious to figure out the root cause of the 
> issue.
> After some lengthy debugging, I was able to figure out that the problem is in 
> SafeAvroSerialization.  When a record is too small to fit in the sort buffer, 
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws 
> MapBufferTooSmallException.  This exception is handled in MapTask.collect() 
> by spilling the record to disk.  The problem is that the 
> BufferedBinaryEncoder used by SafeAvroSerialization is never flushed and as a 
> result corruption occurs when the next record is processed due to data still 
> in the buffer from the previous record getting flushed into the new record.
> I was able to prove further to myself that this was the problem by leaving 
> io.sort.mb at default and modifying SafeAvroSerialization to use a 
> DirectBinaryEncoder instead of a BufferedBinaryEncoder.
> It could be argued that the problem is actually in MapTask with the way it is 
> handling the exception.  Perhaps it should discard the key and value 
> serializers and get new ones when handling this exception.  Doing that would 
> acknowledge that the Serializers might be stateful like 
> SafeAvroSerialization.  I don't see any documentation that suggests they must 
> be stateless.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to