[
https://issues.apache.org/jira/browse/CRUNCH-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13859848#comment-13859848
]
Gabriel Reid commented on CRUNCH-316:
-------------------------------------
I don't think that creating a new BufferedBinaryEncoder based on reuse after
each call to AvroWrapperSerializer#serialize would work. I took a look through
the code path taken if you create a new BufferedBinaryEncoder based on reuse,
and there is a call to flush() in there, which I'm guessing will cause the
MapBufferTooSmallException to be thrown again.
As far as I can see, the underlying OutputStream is always buffered anyway, so
I don't think there'll be any real drawback to using a DirectBinaryEncoder.
> Data Corruption when DatumWriter.write() throws MapBufferTooSmallException
> when called by SafeAvroSerialization
> ---------------------------------------------------------------------------------------------------------------
>
> Key: CRUNCH-316
> URL: https://issues.apache.org/jira/browse/CRUNCH-316
> Project: Crunch
> Issue Type: Bug
> Components: Core
> Reporter: Ben Roling
> Assignee: Josh Wills
> Attachments: ArrayIndexOutOfBoundsException.txt
>
>
> Recently we encountered an issue when processing a crunch join with a large
> Avro record. The job was failing in the reduce phase with the attached
> ArrayIndexOutOfBoundsException deserializing an Avro record.
> One of the first things I noticed when looking into the problem was the
> following message:
> 2013-12-31 10:33:02,489 INFO [pool-1-thread-1]
> org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer:
> 99615133 bytes
> The message indicates a record is too large to fit in the sort buffer (per
> io.sort.mb -- which defaults to 100MB). I increased io.sort.mb and the
> problem went away, but I was curious to figure out the root cause of the
> issue.
> After some lengthy debugging, I was able to figure out that the problem is in
> SafeAvroSerialization. When a record is too small to fit in the sort buffer,
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws
> MapBufferTooSmallException. This exception is handled in MapTask.collect()
> by spilling the record to disk. The problem is that the
> BufferedBinaryEncoder used by SafeAvroSerialization is never flushed and as a
> result corruption occurs when the next record is processed due to data still
> in the buffer from the previous record getting flushed into the new record.
> I was able to prove further to myself that this was the problem by leaving
> io.sort.mb at default and modifying SafeAvroSerialization to use a
> DirectBinaryEncoder instead of a BufferedBinaryEncoder.
> It could be argued that the problem is actually in MapTask with the way it is
> handling the exception. Perhaps it should discard the key and value
> serializers and get new ones when handling this exception. Doing that would
> acknowledge that the Serializers might be stateful like
> SafeAvroSerialization. I don't see any documentation that suggests they must
> be stateless.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)