jecsand838 commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2193371587
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
pub fn avro_header(&self) -> &Header {
&self.header
}
-}
-impl<R: BufRead> Reader<R> {
/// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- if self.finished {
- return Ok(None);
- }
- loop {
- if !self.block_data.is_empty() {
- let consumed = self.decoder.decode(&self.block_data)?;
- if consumed > 0 {
- self.block_data.drain(..consumed);
- }
- match self.decoder.flush()? {
- None => {
- if !self.block_data.is_empty() {
- break;
- }
- }
- Some(batch) => {
- return Ok(Some(batch));
- }
- }
- }
- let maybe_block = {
+ 'outer: while !self.finished && !self.decoder.batch_is_full() {
+ while self.block_cursor == self.block_data.len() {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
- None
- } else {
- let read_len = buf.len();
- let consumed_len = self.block_decoder.decode(buf)?;
- self.reader.consume(consumed_len);
- if consumed_len == 0 && read_len != 0 {
- return Err(ArrowError::ParseError(
- "Could not decode next Avro block from partial
data".to_string(),
- ));
- }
- self.block_decoder.flush()
+ self.finished = true;
+ break 'outer;
}
- };
- match maybe_block {
- Some(block) => {
- let block_data = if let Some(ref codec) = self.compression
{
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(buf)?;
+ self.reader.consume(consumed);
+ if let Some(block) = self.block_decoder.flush() {
+ // Successfully decoded a block.
+ let block_data = if let Some(ref codec) =
self.header.compression()? {
codec.decompress(&block.data)?
} else {
block.data
};
self.block_data = block_data;
+ self.block_cursor = 0;
+ } else if consumed == 0 {
+ // The block decoder made no progress on a non-empty
buffer.
+ return Err(ArrowError::ParseError(
+ "Could not decode next Avro block from partial
data".to_string(),
+ ));
}
- None => {
- self.finished = true;
- if !self.block_data.is_empty() {
- let consumed = self.decoder.decode(&self.block_data)?;
- self.block_data.drain(..consumed);
- }
- return self.decoder.flush();
- }
+ }
+ // Try to decode more rows from the current block.
+ let consumed =
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+ if consumed == 0 && self.block_cursor < self.block_data.len() {
+ self.block_cursor = self.block_data.len();
Review Comment:
@scovich 100% and the Avro spec can definitely be tricky and nuanced.
> if there's no way to consume bytes after an empty record? I'd be curious
how other implementations handle it?
From my research the answer seems to be that no extra bytes exist after an
“empty record” in Avro. It sounds like what the code should do is simply count
that it has seen one record and move on.
In the [Java
implementation](https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java)
`GenericDatumReader.readRecord` iterates over the field list; with zero fields
the loop body never executes, so no calls are made and the input position stays
where it is. The outer caller still decrements the
[`blockRemaining`](https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L264)
counter, so the `Decoder` read logic progresses correctly.
> what happens if the reader provides a schema with fewer fields than the
file's schema?
If the schema used to write the data (in this instance the file's schema)
cannot be resolved with the reader's schema per the [avro specification's
schema resolution
requirements](https://avro.apache.org/docs/1.11.1/specification/#schema-resolution)
then we should return an error for every record written using that invalid
schema imo. Handling the error would then be up to the caller. In the scenario
you called out, the missing fields would be ignored and no error returned per
the specification: `if the writer’s record contains a field with a name not
present in the reader’s record, the writer’s value for that field is ignored.`
> Is the data layout self-describing enough that the decoder can reliably
detect the extra bytes and either skip them or blow up?
It should be imo. From looking over the Java implementation it seems the
writer schema either needs to be:
1. Explicitly provided:
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122
and
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122
2. Provided in the first row (Header) of the stream:
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L142
Or an `MissingSchemaException` is thrown:
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L42
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]