[ 
https://issues.apache.org/jira/browse/CRUNCH-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13860243#comment-13860243
 ] 

Ben Roling commented on CRUNCH-316:
-----------------------------------

Sorry for the delayed response -- was offline a bit for the holidays.  Yes, I 
believe using DirectBinaryEncoder solves the issue -- with that change the 
ArrayIndexOutOfBoundsException is gone.  The only test case I have at the 
moment is the original data that generated the issue.   The expected result of 
the join for that data is actually an empty PTable and with the patch that is 
what I get.  Ideally I would have a test case that shows a non-empty join 
result with the expected (non-corrupt) output, but I don't have that at the 
moment.  I will see if I can whip one up.

I agree with Gabriel that it looks like the backing OutputStream is always 
buffered so there shouldn't be a penalty for using DirectBinaryEncoder.

Thanks [~mkwhitacre] for beating me to the punch with the super-complicated 
patch :)

Taking this back to 0.8.x will be appreciated.

> Data Corruption when DatumWriter.write() throws MapBufferTooSmallException 
> when called by SafeAvroSerialization
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-316
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-316
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.9.0, 0.8.2
>            Reporter: Ben Roling
>            Assignee: Josh Wills
>         Attachments: ArrayIndexOutOfBoundsException.txt, CRUNCH-316.patch
>
>
> Recently we encountered an issue when processing a crunch join with a large 
> Avro record.  The job was failing in the reduce phase with the attached 
> ArrayIndexOutOfBoundsException deserializing an Avro record.
> One of the first things I noticed when looking into the problem was the 
> following message:
> 2013-12-31 10:33:02,489 INFO  [pool-1-thread-1] 
> org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer: 
> 99615133 bytes
> The message indicates a record is too large to fit in the sort buffer (per 
> io.sort.mb -- which defaults to 100MB).  I increased io.sort.mb and the 
> problem went away, but I was curious to figure out the root cause of the 
> issue.
> After some lengthy debugging, I was able to figure out that the problem is in 
> SafeAvroSerialization.  When a record is too small to fit in the sort buffer, 
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws 
> MapBufferTooSmallException.  This exception is handled in MapTask.collect() 
> by spilling the record to disk.  The problem is that the 
> BufferedBinaryEncoder used by SafeAvroSerialization is never flushed and as a 
> result corruption occurs when the next record is processed due to data still 
> in the buffer from the previous record getting flushed into the new record.
> I was able to prove further to myself that this was the problem by leaving 
> io.sort.mb at default and modifying SafeAvroSerialization to use a 
> DirectBinaryEncoder instead of a BufferedBinaryEncoder.
> It could be argued that the problem is actually in MapTask with the way it is 
> handling the exception.  Perhaps it should discard the key and value 
> serializers and get new ones when handling this exception.  Doing that would 
> acknowledge that the Serializers might be stateful like 
> SafeAvroSerialization.  I don't see any documentation that suggests they must 
> be stateless.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to