Ben Roling created CRUNCH-316:
---------------------------------
Summary: Data Corruption when DatumWriter.write() throws
MapBufferTooSmallException when called by SafeAvroSerialization
Key: CRUNCH-316
URL: https://issues.apache.org/jira/browse/CRUNCH-316
Project: Crunch
Issue Type: Bug
Components: Core
Reporter: Ben Roling
Assignee: Josh Wills
Recently we encountered an issue when processing a crunch join with a large
Avro record. The job was failing in the reduce phase with the attached
ArrayIndexOutOfBoundsException deserializing an Avro record.
One of the first things I noticed when looking into the problem was the
following message:
2013-12-31 10:33:02,489 INFO [pool-1-thread-1]
org.apache.hadoop.mapred.MapTask Record too large for in-memory buffer:
99615133 bytes
The message indicates a record is too large to fit in the sort buffer (per
io.sort.mb -- which defaults to 100MB). I increased io.sort.mb and the problem
went away, but I was curious to figure out the root cause of the issue.
After some lengthy debugging, I was able to figure out that the problem is in
SafeAvroSerialization. When a record is too small to fit in the sort buffer,
org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write() throws
MapBufferTooSmallException. This exception is handled in MapTask.collect() by
spilling the record to disk. The problem is that the BufferedBinaryEncoder
used by SafeAvroSerialization is never flushed and as a result corruption
occurs when the next record is processed due to data still in the buffer from
the previous record getting flushed into the new record.
I was able to prove further to myself that this was the problem by leaving
io.sort.mb at default and modifying SafeAvroSerialization to use a
DirectBinaryEncoder instead of a BufferedBinaryEncoder.
It could be argued that the problem is actually in MapTask with the way it is
handling the exception. Perhaps it should discard the key and value
serializers and get new ones when handling this exception. Doing that would
acknowledge that the Serializers might be stateful like SafeAvroSerialization.
I don't see any documentation that suggests they must be stateless.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)