[
https://issues.apache.org/jira/browse/HADOOP-11678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106244#comment-16106244
]
BELUGA BEHR commented on HADOOP-11678:
--------------------------------------
In particular where I see this is with the Intermediate Files. It works by
serializing the key and value into a buffer then reading the buffer's length to
determine how much data is about to be written to the intermediate file. The
problem here is that, for Avro serialization, the serialize method writes to a
BufferedBinaryEncoder {{EncoderFactory.get().binaryEncoder}}. The internal
buffer is flushed to the underlying steam only when a certain number of bytes
is written or if a single value of more than 512 bytes is written. For smaller
Avro objects, nothing is flushed.
{code:title=org.apache.hadoop.mapred.IFile.Writer<K, V>)}
public void append(K key, V value) throws IOException {
...
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
// Append the 'value'
valueSerializer.serialize(value);
int valueLength = buffer.getLength() - keyLength;
{code}
{code:title=org.apache.avro.hadoop.io.AvroSerializer<T>}
/** {@inheritDoc} */
@Override
public void serialize(AvroWrapper<T> avroWrapper) throws IOException {
mAvroDatumWriter.write(avroWrapper.datum(), mAvroEncoder);
// This would be a lot faster if the Serializer interface had a flush()
method and the
// Hadoop framework called it when needed. For now, we'll have to flush on
every record.
mAvroEncoder.flush();
}
{code}
As is the contract of {{org.apache.hadoop.io.serializer.Serializer<T>}},
{quote}
Serializers are stateful, but must not buffer the output since other producers
may write to the output between calls to #serialize(Object).
{quote}
> AvroSerializer buffers output in violation of contract for Serializer
> ---------------------------------------------------------------------
>
> Key: HADOOP-11678
> URL: https://issues.apache.org/jira/browse/HADOOP-11678
> Project: Hadoop Common
> Issue Type: Bug
> Affects Versions: 2.6.0
> Reporter: Matthew Willson
> Attachments: HADOOP-11678.1.patch
>
>
> We've had issues with the deserializer running into EOFException when using
> Cascading's TupleSerialization (which delegates to other hadoop serializers
> to serialize entries within its tuples) in combination with AvroSerialization.
> Eventually tracked it down to the fact that AvroSerialization#AvroSerializer
> is buffering output (since it uses a buffering EncoderFactory#binaryEncoder
> rather than a non-buffering EncoderFactory#directBinaryEncoder):
> https://github.com/apache/hadoop/blob/e1109fb65608a668cd53dc324dadc6f63a74eeb9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java#L105
> The contract for Serializer explicitly states "Serializers ... must not
> buffer the output since other producers may write to the output between calls
> to #serialize(Object)." TupleSerialization does exactly that (write to the
> output between calls to #serialize), hence our problem.
> There's a similar problem with the AvroDeserializer too -- it uses a
> buffering binaryDecoder, and this can consume the underlying InputStream
> beyond the end of the datum it's decoding, meaning that if a different
> Deserializer is used to read the next item, it'll start off in the wrong
> place and get confused.
> Switching AvroSerializer and AvroDeserializer to use the non-buffering
> `EncoderFactory#directBinaryEncoder` and `DecoderFactory#directBinaryDecoder`
> fixes the issue for us.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]