[ https://issues.apache.org/jira/browse/CRUNCH-316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Micah Whitacre updated CRUNCH-316: ---------------------------------- Attachment: CRUNCH-316.patch Patch with the change to DirectBinaryEncoder. [~ben.roling] is going to want this patched back to 0.8.x more than likely. > Data Corruption when DatumWriter.write() throws MapBufferTooSmallException > when called by SafeAvroSerialization > --------------------------------------------------------------------------------------------------------------- > > Key: CRUNCH-316 > URL: https://issues.apache.org/jira/browse/CRUNCH-316 > Project: Crunch > Issue Type: Bug > Components: Core > Affects Versions: 0.9.0, 0.8.2 > Reporter: Ben Roling > Assignee: Josh Wills > Attachments: ArrayIndexOutOfBoundsException.txt, CRUNCH-316.patch > > > Recently we encountered an issue when processing a crunch join with a large > Avro record. The job was failing in the reduce phase with the attached > ArrayIndexOutOfBoundsException deserializing an Avro record. > One of the first things I noticed when looking into the problem was the > following message: > 2013-12-31 10:33:02,489 INFO [pool-1-thread-1] > org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer: > 99615133 bytes > The message indicates a record is too large to fit in the sort buffer (per > io.sort.mb -- which defaults to 100MB). I increased io.sort.mb and the > problem went away, but I was curious to figure out the root cause of the > issue. > After some lengthy debugging, I was able to figure out that the problem is in > SafeAvroSerialization. When a record is too small to fit in the sort buffer, > org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws > MapBufferTooSmallException. This exception is handled in MapTask.collect() > by spilling the record to disk. The problem is that the > BufferedBinaryEncoder used by SafeAvroSerialization is never flushed and as a > result corruption occurs when the next record is processed due to data still > in the buffer from the previous record getting flushed into the new record. > I was able to prove further to myself that this was the problem by leaving > io.sort.mb at default and modifying SafeAvroSerialization to use a > DirectBinaryEncoder instead of a BufferedBinaryEncoder. > It could be argued that the problem is actually in MapTask with the way it is > handling the exception. Perhaps it should discard the key and value > serializers and get new ones when handling this exception. Doing that would > acknowledge that the Serializers might be stateful like > SafeAvroSerialization. I don't see any documentation that suggests they must > be stateless. -- This message was sent by Atlassian JIRA (v6.1.5#6160)