[ 
https://issues.apache.org/jira/browse/CRUNCH-316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Micah Whitacre updated CRUNCH-316:
----------------------------------

    Attachment: CRUNCH-316.patch

Patch with the change to DirectBinaryEncoder.  [~ben.roling] is going to want 
this patched back to 0.8.x more than likely.

> Data Corruption when DatumWriter.write() throws MapBufferTooSmallException 
> when called by SafeAvroSerialization
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-316
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-316
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.9.0, 0.8.2
>            Reporter: Ben Roling
>            Assignee: Josh Wills
>         Attachments: ArrayIndexOutOfBoundsException.txt, CRUNCH-316.patch
>
>
> Recently we encountered an issue when processing a crunch join with a large 
> Avro record.  The job was failing in the reduce phase with the attached 
> ArrayIndexOutOfBoundsException deserializing an Avro record.
> One of the first things I noticed when looking into the problem was the 
> following message:
> 2013-12-31 10:33:02,489 INFO  [pool-1-thread-1] 
> org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer: 
> 99615133 bytes
> The message indicates a record is too large to fit in the sort buffer (per 
> io.sort.mb -- which defaults to 100MB).  I increased io.sort.mb and the 
> problem went away, but I was curious to figure out the root cause of the 
> issue.
> After some lengthy debugging, I was able to figure out that the problem is in 
> SafeAvroSerialization.  When a record is too small to fit in the sort buffer, 
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws 
> MapBufferTooSmallException.  This exception is handled in MapTask.collect() 
> by spilling the record to disk.  The problem is that the 
> BufferedBinaryEncoder used by SafeAvroSerialization is never flushed and as a 
> result corruption occurs when the next record is processed due to data still 
> in the buffer from the previous record getting flushed into the new record.
> I was able to prove further to myself that this was the problem by leaving 
> io.sort.mb at default and modifying SafeAvroSerialization to use a 
> DirectBinaryEncoder instead of a BufferedBinaryEncoder.
> It could be argued that the problem is actually in MapTask with the way it is 
> handling the exception.  Perhaps it should discard the key and value 
> serializers and get new ones when handling this exception.  Doing that would 
> acknowledge that the Serializers might be stateful like 
> SafeAvroSerialization.  I don't see any documentation that suggests they must 
> be stateless.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to